Subscription Create Event Outcome Kafka Part
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / event / avc / SubscriptionEventResponseConsumer.java
index c173862..eb3daeb 100644 (file)
@@ -22,9 +22,12 @@ package org.onap.cps.ncmp.api.impl.event.avc;
 
 import com.hazelcast.map.IMap;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper;
+import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome;
 import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
 import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
@@ -38,13 +41,9 @@ import org.springframework.stereotype.Component;
 public class SubscriptionEventResponseConsumer {
 
     private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
-
     private final SubscriptionPersistence subscriptionPersistence;
-
     private final SubscriptionEventResponseMapper subscriptionEventResponseMapper;
-
-    @Value("${app.ncmp.avc.subscription-outcome-topic}")
-    private String subscriptionOutcomeEventTopic;
+    private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
 
     @Value("${notification.enabled:true}")
     private boolean notificationFeatureEnabled;
@@ -55,30 +54,36 @@ public class SubscriptionEventResponseConsumer {
     /**
      * Consume subscription response event.
      *
-     * @param subscriptionEventResponse the event to be consumed
+     * @param subscriptionEventResponseConsumerRecord the event to be consumed
      */
     @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
         properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
-    public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) {
-        log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId());
-        final String subscriptionEventId = subscriptionEventResponse.getClientId()
-            + subscriptionEventResponse.getSubscriptionName();
-        final boolean createOutcomeResponse;
+    public void consumeSubscriptionEventResponse(
+            final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) {
+        final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value();
+        final String clientId = subscriptionEventResponse.getClientId();
+        log.info("subscription event response of clientId: {} is received.", clientId);
+        final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
+        final String subscriptionEventId = clientId + subscriptionName;
+        boolean isFullOutcomeResponse = false;
         if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
-            forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName());
-            createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
-            if (createOutcomeResponse) {
+            final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
+
+            dmiNames.remove(subscriptionEventResponse.getDmiName());
+            forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
+                    ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
+            isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
+
+            if (isFullOutcomeResponse) {
                 forwardedSubscriptionEventCache.remove(subscriptionEventId);
             }
-        } else {
-            createOutcomeResponse = true;
         }
         if (subscriptionModelLoaderEnabled) {
             updateSubscriptionEvent(subscriptionEventResponse);
         }
-        if (createOutcomeResponse && notificationFeatureEnabled) {
-            log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId);
-            //TODO Create outcome response
+        if (isFullOutcomeResponse && notificationFeatureEnabled) {
+            subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName,
+                    isFullOutcomeResponse);
         }
     }