dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
if (dmiCmSubscriptionPredicates.isEmpty()) {
- acceptAndPublishNcmpOutEventPerDmi(subscriptionId, dmiPluginName);
+ acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName);
} else {
publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
}
"subscriptionCreateRequest", dmiInEvent);
}
- private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) {
+ private void acceptAndPersistCmSubscriptionPerDmi(final String subscriptionId, final String dmiPluginName) {
dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
CmSubscriptionStatus.ACCEPTED);
dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
private final NcmpOutEventMapper ncmpOutEventMapper;
private final DmiCacheHandler dmiCacheHandler;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>();
+ private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionIdAndEventType =
+ new ConcurrentHashMap<>();
/**
* Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
- if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) {
+ final String taskKey = subscriptionId.concat(eventType);
+
+ if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) {
final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType);
- scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture);
- log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId);
+ scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture);
+ log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId,
+ eventType);
} else {
- cancelScheduledTaskForSubscriptionId(subscriptionId);
- publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
- log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId);
+ cancelScheduledTask(taskKey);
+ if (ncmpOutEvent != null) {
+ publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
+ log.debug("Published Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
+ subscriptionId, eventType);
+ }
}
}
TimeUnit.MILLISECONDS);
}
- private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) {
+ private void cancelScheduledTask(final String taskKey) {
- final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId);
+ final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionIdAndEventType.get(taskKey);
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
- scheduledTasksPerSubscriptionId.remove(subscriptionId);
+ scheduledTasksPerSubscriptionIdAndEventType.remove(taskKey);
}
}
private void publishNcmpOutEventNow(final String subscriptionId, final String eventType,
final NcmpOutEvent ncmpOutEvent) {
final CloudEvent ncmpOutEventAsCloudEvent =
- buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
- ncmpOutEvent);
- eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId,
- ncmpOutEventAsCloudEvent);
+ buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, ncmpOutEvent);
+ eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
}
1 * mockDmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId)
}
+ def 'No event published when NCMP out event is null'() {
+ given: 'a cm subscription response for the client'
+ def subscriptionId = 'test-subscription-id-3'
+ def eventType = 'subscriptionCreateResponse'
+ def ncmpOutEvent = null
+ and: 'also we have target topic for publishing to client'
+ objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
+ and: 'a deadline to an event'
+ objectUnderTest.dmiOutEventTimeoutInMs = 1000
+ when: 'the event is scheduled to be published'
+ objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
+ then: 'we wait for 10ms and then we receive response from DMI'
+ Thread.sleep(10)
+ and: 'we receive NO response from DMI so we publish the message on demand'
+ objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
+ and: 'no event published'
+ 0 * mockEventsPublisher.publishCloudEvent(*_)
+ }
+
}