Streamline outcome response for subscription creation
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / events / avcsubscription / SubscriptionEventResponseConsumer.java
index a1860a6..20df706 100644 (file)
@@ -21,6 +21,8 @@
 package org.onap.cps.ncmp.api.impl.events.avcsubscription;
 
 import com.hazelcast.map.IMap;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.RequiredArgsConstructor;
@@ -28,8 +30,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig;
 import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
+import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
 import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.spi.model.DataNode;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -64,28 +69,35 @@ public class SubscriptionEventResponseConsumer {
         log.info("subscription event response of clientId: {} is received.", clientId);
         final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
         final String subscriptionEventId = clientId + subscriptionName;
-        boolean isFullOutcomeResponse = false;
+        boolean createOutcomeResponse = false;
         if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
             final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
 
             dmiNames.remove(subscriptionEventResponse.getDmiName());
             forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
                     ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
-            isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
-
-            if (isFullOutcomeResponse) {
-                forwardedSubscriptionEventCache.remove(subscriptionEventId);
-            }
+            createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
         }
         if (subscriptionModelLoaderEnabled) {
             updateSubscriptionEvent(subscriptionEventResponse);
         }
-        if (isFullOutcomeResponse && notificationFeatureEnabled) {
-            subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName,
-                    isFullOutcomeResponse);
+        if (createOutcomeResponse
+                && notificationFeatureEnabled
+                && hasNoPendingCmHandles(clientId, subscriptionName)) {
+            subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName);
+            forwardedSubscriptionEventCache.remove(subscriptionEventId);
         }
     }
 
+    private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) {
+        final Collection<DataNode> dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent(
+                clientId, subscriptionName);
+        final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
+                DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes(
+                dataNodeSubscription);
+        return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+    }
+
     private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {
         final YangModelSubscriptionEvent yangModelSubscriptionEvent =
                 subscriptionEventResponseMapper