X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2Favcsubscription%2FSubscriptionEventForwarder.java;h=1d87a057a74783da446da9864efbd0fa2ba8118b;hb=9965f958530e0341b109a7df20c88103bdd83862;hp=4afa051d301e510ca439ac1b4ff640ad0bc429f8;hpb=3c0ea89e2bd77d2f4f86f0729643455e1b29c5ac;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 4afa051d3..1d87a057a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; import com.hazelcast.map.IMap; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -34,10 +35,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask; +import org.apache.kafka.common.header.Headers; +import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.event.model.SubscriptionEvent; import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; @@ -53,10 +58,12 @@ public class SubscriptionEventForwarder { private final InventoryPersistence inventoryPersistence; private final EventsPublisher eventsPublisher; private final IMap> forwardedSubscriptionEventCache; - + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; + private final SubscriptionEventMapper subscriptionEventMapper; + private final SubscriptionPersistence subscriptionPersistence; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - - private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-"; + @Value("${app.ncmp.avc.subscription-forward-topic-prefix}") + private String dmiAvcSubscriptionTopicPrefix; @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms:30000}") private int dmiResponseTimeoutInMs; @@ -66,7 +73,8 @@ public class SubscriptionEventForwarder { * * @param subscriptionEvent the event to be forwarded */ - public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, + final Headers eventHeaders) { final List cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets(); if (cmHandleTargets == null || cmHandleTargets.isEmpty() || cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) { @@ -81,31 +89,64 @@ public class SubscriptionEventForwarder { final Map>> dmiPropertiesPerCmHandleIdPerServiceName = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles); + findDmisAndRespond(subscriptionEvent, eventHeaders, cmHandleTargetsAsStrings, + dmiPropertiesPerCmHandleIdPerServiceName); + } + + private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final Headers eventHeaders, + final List cmHandleTargetsAsStrings, + final Map>> + dmiPropertiesPerCmHandleIdPerServiceName) { + final List cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream() + .map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList()); + + final List targetCmHandlesDoesNotExistInDb = new ArrayList<>(cmHandleTargetsAsStrings); + targetCmHandlesDoesNotExistInDb.removeAll(cmHandlesThatExistsInDb); + final Set dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet()); - startResponseTimeout(subscriptionEvent, dmisToRespond); - forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent); + + if (dmisToRespond.isEmpty() || !targetCmHandlesDoesNotExistInDb.isEmpty()) { + updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb); + } + if (dmisToRespond.isEmpty()) { + final String clientID = subscriptionEvent.getEvent().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName); + } else { + startResponseTimeout(subscriptionEvent, dmisToRespond); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent, eventHeaders); + } + } + + private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set dmisToRespond) { + final String subscriptionClientId = subscriptionEvent.getEvent().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + + forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + final ResponseTimeoutTask responseTimeoutTask = + new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome, + subscriptionClientId, subscriptionName); + try { + executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); + } catch (final RuntimeException ex) { + log.info("Caught exception in ScheduledExecutorService for ResponseTimeoutTask. StackTrace: {}", + ex.toString()); + } } private void forwardEventToDmis(final Map>> dmiNameCmHandleMap, - final SubscriptionEvent subscriptionEvent) { + final SubscriptionEvent subscriptionEvent, + final Headers eventHeaders) { dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); final String eventKey = createEventKey(subscriptionEvent, dmiName); - eventsPublisher.publishEvent( - DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent); + final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName; + eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, eventHeaders, subscriptionEvent); }); } - private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set dmisToRespond) { - final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID() - + subscriptionEvent.getEvent().getSubscription().getName(); - - forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond); - final ResponseTimeoutTask responseTimeoutTask = - new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId); - executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); - } - private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { return subscriptionEvent.getEvent().getSubscription().getClientID() + "-" @@ -114,4 +155,23 @@ public class SubscriptionEventForwarder { + dmiName; } + private void updatesCmHandlesToRejectedAndPersistSubscriptionEvent( + final SubscriptionEvent subscriptionEvent, + final List targetCmHandlesDoesNotExistInDb) { + final YangModelSubscriptionEvent yangModelSubscriptionEvent = + subscriptionEventMapper.toYangModelSubscriptionEvent(subscriptionEvent); + yangModelSubscriptionEvent.getPredicates() + .setTargetCmHandles(findRejectedCmHandles(targetCmHandlesDoesNotExistInDb, + yangModelSubscriptionEvent)); + subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); + } + + private static List findRejectedCmHandles( + final List targetCmHandlesDoesNotExistInDb, + final YangModelSubscriptionEvent yangModelSubscriptionEvent) { + return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream() + .filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId())) + .map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(), + SubscriptionStatus.REJECTED)).collect(Collectors.toList()); + } }