+ findDmisAndRespond(subscriptionEvent, eventType, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
+ }
+
+ private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final String eventType,
+ final List<String> cmHandleTargetsAsStrings,
+ final Map<String, Map<String, Map<String, String>>>
+ dmiPropertiesPerCmHandleIdPerServiceName) {
+ final SubscriptionEventResponse emptySubscriptionEventResponse =
+ new SubscriptionEventResponse().withData(new Data());
+ emptySubscriptionEventResponse.getData().setSubscriptionName(
+ subscriptionEvent.getData().getSubscription().getName());
+ emptySubscriptionEventResponse.getData().setClientId(
+ subscriptionEvent.getData().getSubscription().getClientID());
+ final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream()
+ .map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList());
+
+ final List<String> targetCmHandlesDoesNotExistInDb = new ArrayList<>(cmHandleTargetsAsStrings);
+ targetCmHandlesDoesNotExistInDb.removeAll(cmHandlesThatExistsInDb);
+
+ final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet());
+
+ if (dmisToRespond.isEmpty() || !targetCmHandlesDoesNotExistInDb.isEmpty()) {
+ updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb);
+ }
+ if (dmisToRespond.isEmpty()) {
+ subscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse,
+ "subscriptionCreatedStatus");
+ } else {
+ startResponseTimeout(emptySubscriptionEventResponse, dmisToRespond);
+ final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent =
+ clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent);
+ forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent, eventType);
+ }
+ }
+
+ private void startResponseTimeout(final SubscriptionEventResponse emptySubscriptionEventResponse,
+ final Set<String> dmisToRespond) {
+ final String subscriptionClientId = emptySubscriptionEventResponse.getData().getClientId();
+ final String subscriptionName = emptySubscriptionEventResponse.getData().getSubscriptionName();
+ final String subscriptionEventId = subscriptionClientId + subscriptionName;
+
+ forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond,
+ ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
+ final ResponseTimeoutTask responseTimeoutTask =
+ new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome,
+ emptySubscriptionEventResponse);
+ try {
+ executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
+ } catch (final RuntimeException ex) {
+ log.info("Caught exception in ScheduledExecutorService for ResponseTimeoutTask. StackTrace: {}",
+ ex.toString());
+ }
+ }
+
+ private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
+ final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent
+ ncmpSubscriptionEvent, final String eventType) {
+ dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
+ final List<CmHandle> cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map(
+ cmHandleAndProperties -> {
+ final CmHandle cmHandle = new CmHandle();
+ cmHandle.setId(cmHandleAndProperties.getKey());
+ cmHandle.setAdditionalProperties(cmHandleAndProperties.getValue());
+ return cmHandle;
+ }).collect(Collectors.toList());
+
+ ncmpSubscriptionEvent.getData().getPredicates().setTargets(cmHandleTargets);
+ final String eventKey = createEventKey(ncmpSubscriptionEvent, dmiName);
+ final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName;
+
+ final CloudEvent ncmpSubscriptionCloudEvent =
+ SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey, eventType);
+ eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent);