Scheduled task for Subscription Response 16/137516/13
authormpriyank <priyank.maheshwari@est.tech>
Thu, 14 Mar 2024 09:11:27 +0000 (09:11 +0000)
committermpriyank <priyank.maheshwari@est.tech>
Wed, 27 Mar 2024 12:07:40 +0000 (12:07 +0000)
- Need to send the response back to the client in max 30 secs
- Also have the capability to send the response right away
- Testware added for the same
- Also added code to cancel the scheduled task
- Added state for cancelling the task

Issue-ID: CPS-2140
Change-Id: I3ab321d8221cd8f697c26be46d2e63d89b360923
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy

index 21468c3..76ee08e 100644 (file)
@@ -23,10 +23,18 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription;
 import io.cloudevents.CloudEvent;
 import io.cloudevents.core.builder.CloudEventBuilder;
 import java.net.URI;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
 import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
 import org.onap.cps.utils.JsonObjectMapper;
 import org.springframework.beans.factory.annotation.Value;
@@ -39,37 +47,87 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class CmNotificationSubscriptionNcmpOutEventProducer {
 
-    private final EventsPublisher<CloudEvent> eventsPublisher;
-    private final JsonObjectMapper jsonObjectMapper;
-
     @Value("${app.ncmp.avc.subscription-outcome-topic}")
     private String cmNotificationSubscriptionNcmpOutEventTopic;
 
+    @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
+    private Integer cmNotificationSubscriptionDmiOutEventTimeoutInMs;
+
+    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final JsonObjectMapper jsonObjectMapper;
+    private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
+    private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper;
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>();
+
     /**
      * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
      * Event compliant.
      *
      * @param subscriptionId                         Cm Subscription Id
      * @param eventType                              Type of event
-     * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the client
+     * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the
+     *                                               client
+     * @param isScheduledEvent                       Determines if the event is to be scheduled
+     *                                               or published now
      */
     public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType,
-            final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) {
+            final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent,
+            final boolean isScheduledEvent) {
 
-        eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId,
-                buildAndGetCmNotificationNcmpOutEventAsCloudEvent(subscriptionId, eventType,
-                        cmNotificationSubscriptionNcmpOutEvent));
+        if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) {
+            final ScheduledFuture<?> scheduledFuture =
+                    scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType);
+            scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture);
+            log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId);
+        } else {
+            cancelScheduledTaskForSubscriptionId(subscriptionId);
+            publishCmNotificationSubscriptionNcmpOutEventNow(subscriptionId, eventType,
+                    cmNotificationSubscriptionNcmpOutEvent);
+            log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId);
+        }
+
+    }
+
+    private ScheduledFuture<?> scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
+            final String eventType) {
+        final CmNotificationSubscriptionNcmpOutEventPublishingTask
+                cmNotificationSubscriptionNcmpOutEventPublishingTask =
+                new CmNotificationSubscriptionNcmpOutEventPublishingTask(cmNotificationSubscriptionNcmpOutEventTopic,
+                        subscriptionId, eventType, eventsPublisher, jsonObjectMapper, cmNotificationSubscriptionCache,
+                        cmNotificationSubscriptionNcmpOutEventMapper);
+        return scheduledExecutorService.schedule(cmNotificationSubscriptionNcmpOutEventPublishingTask,
+                cmNotificationSubscriptionDmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) {
+
+        final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId);
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(true);
+            scheduledTasksPerSubscriptionId.remove(subscriptionId);
+        }
 
     }
 
-    private CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent(final String subscriptionId,
-            final String eventType,
+
+    private void publishCmNotificationSubscriptionNcmpOutEventNow(final String subscriptionId, final String eventType,
+            final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) {
+        final CloudEvent cmNotificationSubscriptionNcmpOutEventAsCloudEvent =
+                buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
+                        cmNotificationSubscriptionNcmpOutEvent);
+        eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId,
+                cmNotificationSubscriptionNcmpOutEventAsCloudEvent);
+    }
+
+    protected static CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent(
+            final JsonObjectMapper jsonObjectMapper, final String subscriptionId, final String eventType,
             final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) {
 
         return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
-                .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
-                .withExtension("correlationid", subscriptionId)
-                .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build();
+                       .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
+                       .withExtension("correlationid", subscriptionId)
+                       .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build();
     }
 
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
new file mode 100644 (file)
index 0000000..f7ea4a4
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
+
+import static org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionNcmpOutEventProducer.buildAndGetCmNotificationNcmpOutEventAsCloudEvent;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
+import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
+import org.onap.cps.utils.JsonObjectMapper;
+
+@Slf4j
+@RequiredArgsConstructor
+public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Runnable {
+
+
+    private final String topicName;
+    private final String subscriptionId;
+    private final String eventType;
+    private final EventsPublisher<CloudEvent> eventsPublisher;
+    private final JsonObjectMapper jsonObjectMapper;
+    private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
+    private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper;
+
+    /**
+     * Delegating the responsibility of publishing CmNotificationSubscriptionNcmpOutEvent as a separate task which will
+     * be called after a specified delay.
+     */
+    @Override
+    public void run() {
+        final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
+                cmNotificationSubscriptionCache.get(subscriptionId);
+        final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
+                cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
+                        dmiCmNotificationSubscriptionDetailsMap);
+        eventsPublisher.publishCloudEvent(topicName, subscriptionId,
+                buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
+                        cmNotificationSubscriptionNcmpOutEvent));
+    }
+
+}
index 7c1a148..970d7e6 100644 (file)
@@ -3,6 +3,8 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription
 import com.fasterxml.jackson.databind.ObjectMapper
 import io.cloudevents.CloudEvent
 import org.onap.cps.events.EventsPublisher
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails
 import org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper
 import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent
 import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.Data
@@ -13,18 +15,58 @@ class CmNotificationSubscriptionNcmpOutEventProducerSpec extends Specification {
 
     def mockEventsPublisher = Mock(EventsPublisher)
     def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+    def mockCmNotificationSubscriptionCache = Mock(Map<String, Map<String, DmiCmNotificationSubscriptionDetails>>)
+    def mockCmNotificationSubscriptionNcmpOutEventMapper = Mock(CmNotificationSubscriptionNcmpOutEventMapper)
 
-    def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper)
+    def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper, mockCmNotificationSubscriptionCache, mockCmNotificationSubscriptionNcmpOutEventMapper)
 
-    def 'Create and Publish Cm Notification Subscription DMI In Event'() {
+    def 'Create and #scenario Cm Notification Subscription NCMP out event'() {
         given: 'a cm subscription response for the client'
-            def subscriptionId = 'test-subscription-id'
+            def subscriptionId = 'test-subscription-id-2'
             def eventType = 'subscriptionCreateResponse'
-            def cmNotificationSubscriptionNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent(data: new Data(subscriptionId: 'sub-1', acceptedTargets: ['ch-1', 'ch-2']))
+            def cmNotificationSubscriptionNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2']))
         and: 'also we have target topic for publishing to client'
             objectUnderTest.cmNotificationSubscriptionNcmpOutEventTopic = 'client-test-topic'
+        and: 'a deadline to an event'
+            objectUnderTest.cmNotificationSubscriptionDmiOutEventTimeoutInMs = 1000
         when: 'the event is published'
-            objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent)
+            objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent, eventPublishingTaskToBeScheduled)
+        then: 'we conditionally wait for a while'
+            Thread.sleep(delayInMs)
+        then: 'the event contains the required attributes'
+            1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+                args ->
+                    {
+                        assert args[0] == 'client-test-topic'
+                        assert args[1] == subscriptionId
+                        def cmNotificationSubscriptionNcmpOutEventAsCloudEvent = (args[2] as CloudEvent)
+                        assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId
+                        assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse'
+                        assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.source.toString() == 'NCMP'
+                        assert CloudEventMapper.toTargetEvent(cmNotificationSubscriptionNcmpOutEventAsCloudEvent, CmNotificationSubscriptionNcmpOutEvent) == cmNotificationSubscriptionNcmpOutEvent
+                    }
+            }
+        where: 'following scenarios are considered'
+            scenario                                          | delayInMs | eventPublishingTaskToBeScheduled
+            'publish event now'                               | 0         | false
+            'schedule and publish after the configured time ' | 1500      | true
+    }
+
+    def 'Schedule Cm Notification Subscription NCMP out event but later publish it on demand'() {
+        given: 'a cm subscription response for the client'
+            def subscriptionId = 'test-subscription-id-3'
+            def eventType = 'subscriptionCreateResponse'
+            def cmNotificationSubscriptionNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3']))
+        and: 'also we have target topic for publishing to client'
+            objectUnderTest.cmNotificationSubscriptionNcmpOutEventTopic = 'client-test-topic'
+        and: 'a deadline to an event'
+            objectUnderTest.cmNotificationSubscriptionDmiOutEventTimeoutInMs = 1000
+        when: 'the event is scheduled to be published'
+            objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent, true)
+        then: 'we wait for 10ms and then we receive response from DMI'
+            Thread.sleep(10)
+        and: 'we receive response from DMI so we publish the message on demand'
+            objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent, false)
         then: 'the event contains the required attributes'
             1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
                 args ->