Timeout for Subscription Create Partial Response 24/134124/12
authorlukegleeson <luke.gleeson@est.tech>
Thu, 6 Apr 2023 14:28:56 +0000 (15:28 +0100)
committerlukegleeson <luke.gleeson@est.tech>
Fri, 28 Apr 2023 11:22:37 +0000 (12:22 +0100)
- Implemented default 30s timeout for DMI Responses
- Placeholders have been TODO'd for Outcome Response generation and Persisted Subscription Updating
- Refactored common HazelcastCacheConfig methods
- Some tests required a blocking variable due to seperate thread usage

Issue-ID: CPS-1599
Signed-off-by: lukegleeson <luke.gleeson@est.tech>
Change-Id: I2b1a35e93939daa0524d379ac4736d714ef61a6f

15 files changed:
cps-application/src/main/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
cps-service/src/main/java/org/onap/cps/cache/AnchorDataCacheConfig.java
cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java [new file with mode: 0644]

index b95d9eb..1eb1c11 100644 (file)
@@ -99,6 +99,9 @@ app:
             topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
         avc:
             subscription-topic: ${NCMP_CM_AVC_SUBSCRIPTION:cm-avc-subscription}
+            subscription-forward-topic: ${NCMP_FORWARD_CM_AVC_SUBSCRIPTION:ncmp-dmi-cm-avc-subscription}
+            subscription-response-topic: ${NCMP_RESPONSE_CM_AVC_SUBSCRIPTION:dmi-ncmp-cm-avc-subscription}
+            subscription-outcome-topic: ${NCMP_OUTCOME_CM_AVC_SUBSCRIPTION:cm-avc-subscription-response}
             cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}
     lcm:
         events:
@@ -181,6 +184,10 @@ ncmp:
             sleep-time-ms: 300000
         cm-handle-data-sync:
             sleep-time-ms: 30000
+        subscription-forwarding:
+            dmi-response-timeout-ms: 30000
+        model-loader:
+            retry-time-ms: 1000
 
     modules-sync-watchdog:
         async-executor:
@@ -188,12 +195,11 @@ ncmp:
 
     model-loader:
         subscription: false
-        maximumAttemptCount: 20
-        retryTimeMs: 1000
+        maximum-attempt-count: 20
 
 # Custom Hazelcast Config.
 hazelcast:
-  mode:
-    kubernetes:
-      enabled: ${HAZELCAST_MODE_KUBERNETES_ENABLED:false}
-      service-name: ${CPS_NCMP_SERVICE_NAME:"cps-and-ncmp-service"}
\ No newline at end of file
+    mode:
+        kubernetes:
+            enabled: ${HAZELCAST_MODE_KUBERNETES_ENABLED:false}
+            service-name: ${CPS_NCMP_SERVICE_NAME:"cps-and-ncmp-service"}
\ No newline at end of file
index 0b67266..ff7afc9 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.config.embeddedcache;
 
-import com.hazelcast.config.Config;
 import com.hazelcast.config.MapConfig;
-import com.hazelcast.config.NamedConfig;
 import com.hazelcast.config.QueueConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.map.IMap;
 import java.util.concurrent.BlockingQueue;
 import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.cache.HazelcastCacheConfig;
 import org.onap.cps.spi.model.DataNode;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -39,18 +35,12 @@ import org.springframework.context.annotation.Configuration;
  */
 @Slf4j
 @Configuration
-public class SynchronizationCacheConfig {
+public class SynchronizationCacheConfig extends HazelcastCacheConfig {
 
     public static final int MODULE_SYNC_STARTED_TTL_SECS = 600;
     public static final int DATA_SYNC_SEMAPHORE_TTL_SECS = 1800;
 
-    @Value("${hazelcast.mode.kubernetes.enabled}")
-    private boolean cacheKubernetesEnabled;
-
-    @Value("${hazelcast.mode.kubernetes.service-name}")
-    private String cacheKubernetesServiceName;
-
-    private static final QueueConfig commonQueueConfig = createQueueConfig();
+    private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig");
     private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
     private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
 
@@ -61,7 +51,8 @@ public class SynchronizationCacheConfig {
      */
     @Bean
     public BlockingQueue<DataNode> moduleSyncWorkQueue() {
-        return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig)
+        return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig,
+            "synchronization-caches")
             .getQueue("moduleSyncWorkQueue");
     }
 
@@ -72,7 +63,8 @@ public class SynchronizationCacheConfig {
      */
     @Bean
     public IMap<String, Object> moduleSyncStartedOnCmHandles() {
-        return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig)
+        return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig,
+            "synchronization-caches")
             .getMap("moduleSyncStartedOnCmHandles");
     }
 
@@ -83,48 +75,8 @@ public class SynchronizationCacheConfig {
      */
     @Bean
     public IMap<String, Boolean> dataSyncSemaphores() {
-        return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig)
+        return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig,
+            "synchronization-caches")
             .getMap("dataSyncSemaphores");
     }
-
-    private HazelcastInstance createHazelcastInstance(
-        final String hazelcastInstanceName, final NamedConfig namedConfig) {
-        return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig));
-    }
-
-    private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) {
-        final Config config = new Config(instanceName);
-        if (namedConfig instanceof MapConfig) {
-            config.addMapConfig((MapConfig) namedConfig);
-        }
-        if (namedConfig instanceof QueueConfig) {
-            config.addQueueConfig((QueueConfig) namedConfig);
-        }
-        config.setClusterName("synchronization-caches");
-        updateDiscoveryMode(config);
-        return config;
-    }
-
-    private static QueueConfig createQueueConfig() {
-        final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig");
-        commonQueueConfig.setBackupCount(3);
-        commonQueueConfig.setAsyncBackupCount(3);
-        return commonQueueConfig;
-    }
-
-    private static MapConfig createMapConfig(final String configName) {
-        final MapConfig mapConfig = new MapConfig(configName);
-        mapConfig.setBackupCount(3);
-        mapConfig.setAsyncBackupCount(3);
-        return mapConfig;
-    }
-
-    private void updateDiscoveryMode(final Config config) {
-        if (cacheKubernetesEnabled) {
-            log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName);
-            config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
-                    .setProperty("service-name", cacheKubernetesServiceName);
-        }
-    }
-
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java
new file mode 100644 (file)
index 0000000..d2c3dc2
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc;
+
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.map.IMap;
+import java.util.Set;
+import org.onap.cps.cache.HazelcastCacheConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Core infrastructure of the hazelcast distributed cache for subscription forward config use cases.
+ */
+@Configuration
+public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig {
+
+    private static final MapConfig forwardedSubscriptionEventCacheMapConfig =
+        createMapConfig("forwardedSubscriptionEventCacheMapConfig");
+
+    /**
+     * Distributed instance of forwarded subscription information cache that contains subscription event
+     * id by dmi names as properties.
+     *
+     * @return configured map of subscription event ids as keys to sets of dmi names for values
+     */
+    @Bean
+    public IMap<String, Set<String>> forwardedSubscriptionEventCache() {
+        return createHazelcastInstance("hazelCastInstanceSubscriptionEvents",
+            forwardedSubscriptionEventCacheMapConfig, "cps-ncmp-service-caches")
+            .getMap("forwardedSubscriptionEventCache");
+    }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java
new file mode 100644 (file)
index 0000000..e7edecf
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc;
+
+import com.hazelcast.map.IMap;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@RequiredArgsConstructor
+public class ResponseTimeoutTask implements Runnable {
+
+    private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
+    private final String subscriptionEventId;
+
+    @Override
+    public void run() {
+        if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
+            final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
+            if (dmiNames.isEmpty()) {
+                //TODO full outcome response here
+                log.info("placeholder to create full outcome response for subscriptionEventId: {}.",
+                    subscriptionEventId);
+            } else {
+                //TODO partial outcome response here
+                log.info("placeholder to create partial outcome response for subscriptionEventId: {}.",
+                    subscriptionEventId);
+            }
+            forwardedSubscriptionEventCache.remove(subscriptionEventId);
+        }
+    }
+}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java
new file mode 100644 (file)
index 0000000..b332ad1
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc;
+
+import com.hazelcast.map.IMap;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class SubscriptionEventResponseConsumer {
+
+    private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
+
+    @Value("${app.ncmp.avc.subscription-outcome-topic}")
+    private String subscriptionOutcomeEventTopic;
+
+    @Value("${notification.enabled:true}")
+    private boolean notificationFeatureEnabled;
+
+    @Value("${ncmp.model-loader.subscription:false}")
+    private boolean subscriptionModelLoaderEnabled;
+
+    /**
+     * Consume subscription response event.
+     *
+     * @param subscriptionEventResponse the event to be consumed
+     */
+    @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
+        properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
+    public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) {
+        log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId());
+        final String subscriptionEventId = subscriptionEventResponse.getClientId()
+            + subscriptionEventResponse.getSubscriptionName();
+        final boolean createOutcomeResponse;
+        if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
+            forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName());
+            createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
+            if (createOutcomeResponse) {
+                forwardedSubscriptionEventCache.remove(subscriptionEventId);
+            }
+        } else {
+            createOutcomeResponse = true;
+        }
+        if (subscriptionModelLoaderEnabled) {
+            updateSubscriptionEvent(subscriptionEventResponse);
+        }
+        if (createOutcomeResponse && notificationFeatureEnabled) {
+            log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId);
+            //TODO Create outcome response
+        }
+    }
+
+    private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {
+        log.info("placeholder to update persisted subscription for subscriptionEventId: {}.",
+            subscriptionEventResponse.getClientId() + subscriptionEventResponse.getSubscriptionName());
+    }
+}
\ No newline at end of file
index 1fb72a5..4afa051 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.avcsubscription;
 
+import com.hazelcast.map.IMap;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
 import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
 import org.onap.cps.ncmp.event.model.SubscriptionEvent;
 import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 
@@ -44,8 +52,15 @@ public class SubscriptionEventForwarder {
 
     private final InventoryPersistence inventoryPersistence;
     private final EventsPublisher<SubscriptionEvent> eventsPublisher;
+    private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
+
+    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
     private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-";
 
+    @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms:30000}")
+    private int dmiResponseTimeoutInMs;
+
     /**
      * Forward subscription event.
      *
@@ -65,15 +80,32 @@ public class SubscriptionEventForwarder {
 
         final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
                 = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
-        dmiPropertiesPerCmHandleIdPerServiceName.forEach((dmiServiceName, cmHandlePropertiesMap) -> {
-            subscriptionEvent.getEvent().getPredicates().setTargets(Collections
-                    .singletonList(cmHandlePropertiesMap));
-            final String eventKey = createEventKey(subscriptionEvent, dmiServiceName);
-            eventsPublisher.publishEvent(DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiServiceName, eventKey,
-                    subscriptionEvent);
+
+        final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet());
+        startResponseTimeout(subscriptionEvent, dmisToRespond);
+        forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent);
+    }
+
+    private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
+                                    final SubscriptionEvent subscriptionEvent) {
+        dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
+            subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap));
+            final String eventKey = createEventKey(subscriptionEvent, dmiName);
+            eventsPublisher.publishEvent(
+                DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent);
         });
     }
 
+    private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) {
+        final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID()
+            + subscriptionEvent.getEvent().getSubscription().getName();
+
+        forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond);
+        final ResponseTimeoutTask responseTimeoutTask =
+            new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId);
+        executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
+    }
+
     private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) {
         return subscriptionEvent.getEvent().getSubscription().getClientID()
             + "-"
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java
new file mode 100644 (file)
index 0000000..95e773c
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Getter
+@Setter
+public class SubscriptionEventResponse {
+    private String clientId;
+    private String subscriptionName;
+    private String dmiName;
+    private Map<String, SubscriptionStatus> cmHandleIdToStatus;
+}
index 5a418fd..659779a 100644 (file)
@@ -52,10 +52,10 @@ public class SubscriptionModelLoader implements ModelLoader {
     private static final String SUBSCRIPTION_SCHEMASET_NAME = "subscriptions";
     private static final String SUBSCRIPTION_REGISTRY_DATANODE_NAME = "subscription-registry";
 
-    @Value("${ncmp.model-loader.maximumAttemptCount:20}")
+    @Value("${ncmp.model-loader.maximum-attempt-count:20}")
     private int maximumAttemptCount;
 
-    @Value("${ncmp.model-loader.retryTimeMs:1000}")
+    @Value("${ncmp.timers.model-loader.retry-time-ms:1000}")
     private long retryTimeMs;
 
     @Value("${ncmp.model-loader.subscription:false}")
@@ -99,7 +99,7 @@ public class SubscriptionModelLoader implements ModelLoader {
                 }
             } else {
                 throw new NcmpStartUpException("Retrieval of NCMP dataspace fails",
-                        "NCMP dataspace does not exist");
+                    "NCMP dataspace does not exist");
             }
         }
     }
@@ -139,7 +139,7 @@ public class SubscriptionModelLoader implements ModelLoader {
      */
     @Override
     public boolean createAnchor(final String dataspaceName, final String schemaSetName,
-                             final String anchorName) {
+                                final String anchorName) {
         try {
             cpsAdminService.createAnchor(dataspaceName, schemaSetName, anchorName);
         } catch (final AlreadyDefinedException exception) {
index 6829d83..567debd 100644 (file)
@@ -51,8 +51,8 @@ class SynchronizationCacheConfigSpec extends Specification {
             assert null != moduleSyncStartedOnCmHandles
         and: 'system is able to create an instance of a map to hold data sync semaphores'
             assert null != dataSyncSemaphores
-        and: 'there 3 instances'
-            assert Hazelcast.allHazelcastInstances.size() == 3
+        and: 'there are at least 3 instances'
+            assert Hazelcast.allHazelcastInstances.size() > 2
         and: 'they have the correct names (in any order)'
             assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' )
     }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy
new file mode 100644 (file)
index 0000000..7448daf
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc
+
+import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.map.IMap
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ForwardedSubscriptionEventCacheConfig])
+class ForwardedSubscriptionEventCacheConfigSpec extends Specification {
+
+    @Autowired
+    private IMap<String, Set<String>> forwardedSubscriptionEventCache
+
+    def 'Embedded (hazelcast) cache for Forwarded Subscription Event Cache.'() {
+        expect: 'system is able to create an instance of the Forwarded Subscription Event Cache'
+            assert null != forwardedSubscriptionEventCache
+        and: 'there is at least 1 instance'
+            assert Hazelcast.allHazelcastInstances.size() > 0
+        and: 'Forwarded Subscription Event Cache is present'
+            assert Hazelcast.allHazelcastInstances.name.contains('hazelCastInstanceSubscriptionEvents')
+    }
+
+    def 'Verify configs for Distributed Caches'(){
+        given: 'the Forwarded Subscription Event Cache config'
+            def forwardedSubscriptionEventCacheConfig =  Hazelcast.getHazelcastInstanceByName('hazelCastInstanceSubscriptionEvents').config.mapConfigs.get('forwardedSubscriptionEventCacheMapConfig')
+        expect: 'system created instance with correct config'
+            assert forwardedSubscriptionEventCacheConfig.backupCount == 3
+            assert forwardedSubscriptionEventCacheConfig.asyncBackupCount == 3
+    }
+
+    def 'Verify deployment network configs for Distributed Caches'() {
+        given: 'the Forwarded Subscription Event Cache config'
+            def forwardedSubscriptionEventCacheNetworkConfig = Hazelcast.getHazelcastInstanceByName('hazelCastInstanceSubscriptionEvents').config.networkConfig
+        expect: 'system created instance with correct config'
+            assert forwardedSubscriptionEventCacheNetworkConfig.join.autoDetectionConfig.enabled
+            assert !forwardedSubscriptionEventCacheNetworkConfig.join.kubernetesConfig.enabled
+    }
+
+    def 'Verify network config'() {
+        given: 'Synchronization config object and test configuration'
+            def objectUnderTest = new ForwardedSubscriptionEventCacheConfig()
+            def testConfig = new Config()
+        when: 'kubernetes properties are enabled'
+            objectUnderTest.cacheKubernetesEnabled = true
+            objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
+        and: 'method called to update the discovery mode'
+            objectUnderTest.updateDiscoveryMode(testConfig)
+        then: 'applied properties are reflected'
+            assert testConfig.networkConfig.join.kubernetesConfig.enabled
+            assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
+
+    }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy
new file mode 100644 (file)
index 0000000..a673462
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (c) 2023 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.hazelcast.map.IMap
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.api.models.SubscriptionEventResponse
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.boot.test.context.SpringBootTest
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
+
+    IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
+
+    def objectUnderTest = new SubscriptionEventResponseConsumer(mockForwardedSubscriptionEventCache)
+
+
+    def 'Consume Subscription Event Response where all DMIs have responded'() {
+        given: 'a subscription event response with a clientId, subscriptionName and dmiName'
+            def testEventReceived = new SubscriptionEventResponse()
+            testEventReceived.clientId = 'some-client-id'
+            testEventReceived.subscriptionName = 'some-subscription-name'
+            testEventReceived.dmiName = 'some-dmi-name'
+        and: 'notifications are enabled'
+            objectUnderTest.notificationFeatureEnabled = true
+        and: 'subscription model loader is enabled'
+            objectUnderTest.subscriptionModelLoaderEnabled = true
+        when: 'the valid event is consumed'
+            objectUnderTest.consumeSubscriptionEventResponse(testEventReceived)
+        then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
+            1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
+            1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name'] as Set)
+        and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed'
+            1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> ([] as Set)
+        and: 'the subscription event is removed from the map'
+            1 * mockForwardedSubscriptionEventCache.remove('some-client-idsome-subscription-name')
+    }
+
+    def 'Consume Subscription Event Response where another DMI has not yet responded'() {
+        given: 'a subscription event response with a clientId, subscriptionName and dmiName'
+            def testEventReceived = new SubscriptionEventResponse()
+            testEventReceived.clientId = 'some-client-id'
+            testEventReceived.subscriptionName = 'some-subscription-name'
+            testEventReceived.dmiName = 'some-dmi-name'
+        and: 'notifications are enabled'
+            objectUnderTest.notificationFeatureEnabled = true
+        and: 'subscription model loader is enabled'
+            objectUnderTest.subscriptionModelLoaderEnabled = true
+        when: 'the valid event is consumed'
+            objectUnderTest.consumeSubscriptionEventResponse(testEventReceived)
+        then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
+            1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
+            1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name', 'non-responded-dmi'] as Set)
+        and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed'
+            1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['non-responded-dmi'] as Set)
+        and: 'the subscription event is not removed from the map'
+            0 * mockForwardedSubscriptionEventCache.remove(_)
+    }
+}
index 3a7aa48..f2ff1f7 100644 (file)
@@ -48,7 +48,7 @@ class SubscriptionEventMapperSpec extends Specification {
             def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap)
         then: 'the resulting yang model subscription event contains the correct clientId'
             assert result.clientId == "SCO-9989752"
-        and: 'client name'
+        and: 'subscription name'
             assert result.subscriptionName == "cm-subscription-001"
         and: 'is tagged value is false'
             assert !result.isTagged
@@ -60,4 +60,21 @@ class SubscriptionEventMapperSpec extends Specification {
             assert result.topic == null
     }
 
+    def 'Map null subscription event to yang model subscription event where #scenario'() {
+        given: 'a new Subscription Event with no data'
+            def testEventToMap = new SubscriptionEvent()
+        when: 'the event is mapped to a yang model subscription'
+            def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap)
+        then: 'the resulting yang model subscription event contains null clientId'
+            assert result.clientId == null
+        and: 'subscription name is null'
+            assert result.subscriptionName == null
+        and: 'is tagged value is false'
+            assert result.isTagged == false
+        and: 'predicates is null'
+            assert result.predicates == null
+        and: 'the topic is null'
+            assert result.topic == null
+    }
+
 }
\ No newline at end of file
index 2b0adf3..457eb6f 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.cps.ncmp.api.impl.events.avcsubscription
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.hazelcast.map.IMap
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence
@@ -29,20 +30,28 @@ import org.onap.cps.ncmp.event.model.SubscriptionEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.spi.exceptions.OperationNotYetSupportedException
 import org.onap.cps.utils.JsonObjectMapper
+import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
+import spock.util.concurrent.BlockingVariable
 
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder])
 class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
 
-    def mockInventoryPersistence = Mock(InventoryPersistence)
-    def mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>)
-    def objectUnderTest = new SubscriptionEventForwarder(mockInventoryPersistence, mockSubscriptionEventPublisher)
+    @Autowired
+    SubscriptionEventForwarder objectUnderTest
+
+    @SpringBean
+    InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
+    @SpringBean
+    EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>)
+    @SpringBean
+    IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
-    def 'Forward valid CM create subscription'() {
+    def 'Forward valid CM create subscription and simulate timeout where #scenario'() {
         given: 'an event'
             def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
@@ -52,9 +61,17 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
                 createYangModelCmHandleWithDmiProperty(2, 1,"shape","square"),
                 createYangModelCmHandleWithDmiProperty(3, 2,"shape","triangle")
             ]
+        and: 'the thread creation delay is reduced to 2 seconds for testing'
+            objectUnderTest.dmiResponseTimeoutInMs = 2000
+        and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
+            def block = new BlockingVariable<Object>(5)
         when: 'the valid event is forwarded'
             objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
-        then: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
+        then: 'An asynchronous call is made to the blocking variable'
+            block.get()
+        then: 'the event is added to the forwarded subscription event cache'
+            1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
+        and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
             1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1",
                 subscriptionEvent -> {
                     Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
@@ -68,6 +85,15 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
                     targets["CMHandle3"] == ["shape":"triangle"]
                 }
             )
+        and: 'a separate thread has been created where the map is polled'
+            1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
+            1 * mockForwardedSubscriptionEventCache.get(_) >> (DMINamesInMap)
+        and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
+            1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
+        where:
+            scenario                                  | DMINamesInMap
+            'there are dmis which have not responded' | ["DMIName1", "DMIName2"] as Set
+            'all dmis have responded '                | [] as Set
     }
 
     def 'Forward CM create subscription where target CM Handles are #scenario'() {
index 3acba0b..5ee6b38 100644 (file)
 
 package org.onap.cps.cache;
 
-import com.hazelcast.config.Config;
 import com.hazelcast.config.MapConfig;
-import com.hazelcast.config.NamedConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.map.IMap;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 /**
  * Core infrastructure of the hazelcast distributed cache for anchor data config use cases.
  */
-@Slf4j
 @Configuration
-public class AnchorDataCacheConfig {
-
-    @Value("${hazelcast.mode.kubernetes.enabled}")
-    private boolean cacheKubernetesEnabled;
-
-    @Value("${hazelcast.mode.kubernetes.service-name}")
-    private String cacheKubernetesServiceName;
+public class AnchorDataCacheConfig extends HazelcastCacheConfig {
 
     private static final MapConfig anchorDataCacheMapConfig = createMapConfig("anchorDataCacheMapConfig");
 
@@ -53,36 +40,7 @@ public class AnchorDataCacheConfig {
      */
     @Bean
     public IMap<String, AnchorDataCacheEntry> anchorDataCache() {
-        return createHazelcastInstance("hazelCastInstanceCpsCore", anchorDataCacheMapConfig)
-                .getMap("anchorDataCache");
-    }
-
-    private HazelcastInstance createHazelcastInstance(final String hazelcastInstanceName,
-            final NamedConfig namedConfig) {
-        return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig));
-    }
-
-    private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) {
-        final Config config = new Config(instanceName);
-        config.addMapConfig((MapConfig) namedConfig);
-        config.setClusterName("cps-service-caches");
-        updateDiscoveryMode(config);
-        return config;
+        return createHazelcastInstance("hazelCastInstanceCpsCore", anchorDataCacheMapConfig, "cps-service-caches")
+            .getMap("anchorDataCache");
     }
-
-    private static MapConfig createMapConfig(final String configName) {
-        final MapConfig mapConfig = new MapConfig(configName);
-        mapConfig.setBackupCount(3);
-        mapConfig.setAsyncBackupCount(3);
-        return mapConfig;
-    }
-
-    private void updateDiscoveryMode(final Config config) {
-        if (cacheKubernetesEnabled) {
-            log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName);
-            config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
-                    .setProperty("service-name", cacheKubernetesServiceName);
-        }
-    }
-
 }
diff --git a/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java b/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java
new file mode 100644 (file)
index 0000000..4aebcea
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.cache;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.NamedConfig;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+
+/**
+ * Core infrastructure of the hazelcast distributed cache.
+ */
+@Slf4j
+public class HazelcastCacheConfig {
+
+    @Value("${hazelcast.mode.kubernetes.enabled}")
+    protected boolean cacheKubernetesEnabled;
+
+    @Value("${hazelcast.mode.kubernetes.service-name}")
+    protected String cacheKubernetesServiceName;
+
+    protected HazelcastInstance createHazelcastInstance(final String hazelcastInstanceName,
+                                                        final NamedConfig namedConfig, final String clusterName) {
+        return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig, clusterName));
+    }
+
+    private Config initializeConfig(final String instanceName, final NamedConfig namedConfig,
+                                    final String clusterName) {
+        final Config config = new Config(instanceName);
+        if (namedConfig instanceof MapConfig) {
+            config.addMapConfig((MapConfig) namedConfig);
+        }
+        if (namedConfig instanceof QueueConfig) {
+            config.addQueueConfig((QueueConfig) namedConfig);
+        }
+        config.setClusterName(clusterName);
+        updateDiscoveryMode(config);
+        return config;
+    }
+
+    protected static MapConfig createMapConfig(final String configName) {
+        final MapConfig mapConfig = new MapConfig(configName);
+        mapConfig.setBackupCount(3);
+        mapConfig.setAsyncBackupCount(3);
+        return mapConfig;
+    }
+
+    protected static QueueConfig createQueueConfig(final String configName) {
+        final QueueConfig commonQueueConfig = new QueueConfig(configName);
+        commonQueueConfig.setBackupCount(3);
+        commonQueueConfig.setAsyncBackupCount(3);
+        return commonQueueConfig;
+    }
+
+    protected void updateDiscoveryMode(final Config config) {
+        if (cacheKubernetesEnabled) {
+            log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName);
+            config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
+                .setProperty("service-name", cacheKubernetesServiceName);
+        }
+    }
+
+}