import com.hazelcast.map.IMap;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper;
+import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
public class SubscriptionEventResponseConsumer {
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
-
private final SubscriptionPersistence subscriptionPersistence;
-
private final SubscriptionEventResponseMapper subscriptionEventResponseMapper;
-
- @Value("${app.ncmp.avc.subscription-outcome-topic}")
- private String subscriptionOutcomeEventTopic;
+ private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
@Value("${notification.enabled:true}")
private boolean notificationFeatureEnabled;
/**
* Consume subscription response event.
*
- * @param subscriptionEventResponse the event to be consumed
+ * @param subscriptionEventResponseConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
- public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) {
- log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId());
- final String subscriptionEventId = subscriptionEventResponse.getClientId()
- + subscriptionEventResponse.getSubscriptionName();
- final boolean createOutcomeResponse;
+ public void consumeSubscriptionEventResponse(
+ final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) {
+ final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value();
+ final String clientId = subscriptionEventResponse.getClientId();
+ log.info("subscription event response of clientId: {} is received.", clientId);
+ final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
+ final String subscriptionEventId = clientId + subscriptionName;
+ boolean isFullOutcomeResponse = false;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
- forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName());
- createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
- if (createOutcomeResponse) {
+ final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
+
+ dmiNames.remove(subscriptionEventResponse.getDmiName());
+ forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
+ ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
+ isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
+
+ if (isFullOutcomeResponse) {
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
- } else {
- createOutcomeResponse = true;
}
if (subscriptionModelLoaderEnabled) {
updateSubscriptionEvent(subscriptionEventResponse);
}
- if (createOutcomeResponse && notificationFeatureEnabled) {
- log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId);
- //TODO Create outcome response
+ if (isFullOutcomeResponse && notificationFeatureEnabled) {
+ subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName,
+ isFullOutcomeResponse);
}
}