Subscription Creation: NCMP to Client CloudEvent transformation
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / events / avcsubscription / SubscriptionEventResponseConsumer.java
index 20df706..ddb9fd6 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.cps.ncmp.api.impl.events.avcsubscription;
 
 import com.hazelcast.map.IMap;
+import io.cloudevents.CloudEvent;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
@@ -32,8 +33,9 @@ import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEven
 import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
 import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
 import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
 import org.onap.cps.spi.model.DataNode;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
@@ -61,19 +63,21 @@ public class SubscriptionEventResponseConsumer {
      * @param subscriptionEventResponseConsumerRecord the event to be consumed
      */
     @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
-        properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
+            containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
     public void consumeSubscriptionEventResponse(
-            final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) {
-        final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value();
-        final String clientId = subscriptionEventResponse.getClientId();
+            final ConsumerRecord<String, CloudEvent> subscriptionEventResponseConsumerRecord) {
+        final CloudEvent cloudEvent = subscriptionEventResponseConsumerRecord.value();
+        final String eventType = subscriptionEventResponseConsumerRecord.value().getType();
+        final SubscriptionEventResponse subscriptionEventResponse =
+                SubscriptionEventResponseCloudMapper.toSubscriptionEventResponse(cloudEvent);
+        final String clientId = subscriptionEventResponse.getData().getClientId();
         log.info("subscription event response of clientId: {} is received.", clientId);
-        final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
+        final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
         final String subscriptionEventId = clientId + subscriptionName;
         boolean createOutcomeResponse = false;
         if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
             final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
-
-            dmiNames.remove(subscriptionEventResponse.getDmiName());
+            dmiNames.remove(subscriptionEventResponse.getData().getDmiName());
             forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
                     ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
             createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
@@ -84,7 +88,7 @@ public class SubscriptionEventResponseConsumer {
         if (createOutcomeResponse
                 && notificationFeatureEnabled
                 && hasNoPendingCmHandles(clientId, subscriptionName)) {
-            subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName);
+            subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, eventType);
             forwardedSubscriptionEventCache.remove(subscriptionEventId);
         }
     }
@@ -92,10 +96,15 @@ public class SubscriptionEventResponseConsumer {
     private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) {
         final Collection<DataNode> dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent(
                 clientId, subscriptionName);
-        final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
-                DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes(
-                dataNodeSubscription);
-        return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+        final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapOriginal =
+                DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodeSubscription);
+        for (final Map<String, String> statusAndDetailsMap : cmHandleIdToStatusAndDetailsAsMapOriginal.values()) {
+            final String status = statusAndDetailsMap.get("status");
+            if (SubscriptionStatus.PENDING.toString().equals(status)) {
+                return false;
+            }
+        }
+        return true;
     }
 
     private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {