8cfb3ad563fe7ff4b2c0aa2760d87638e7191baa
[cps.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2024-2025 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
22
23 import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1;
24
25 import io.cloudevents.CloudEvent;
26 import java.util.Map;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.TimeUnit;
32 import lombok.RequiredArgsConstructor;
33 import lombok.extern.slf4j.Slf4j;
34 import org.onap.cps.events.EventsPublisher;
35 import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
36 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
37 import org.onap.cps.ncmp.utils.events.NcmpEvent;
38 import org.springframework.beans.factory.annotation.Value;
39 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
40 import org.springframework.stereotype.Component;
41
42 @Component
43 @Slf4j
44 @RequiredArgsConstructor
45 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
46 public class NcmpOutEventProducer {
47
48     @Value("${app.ncmp.avc.cm-subscription-ncmp-out}")
49     private String ncmpOutEventTopic;
50
51     @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
52     private Integer dmiOutEventTimeoutInMs;
53
54     private final EventsPublisher<CloudEvent> eventsPublisher;
55     private final NcmpOutEventMapper ncmpOutEventMapper;
56     private final DmiCacheHandler dmiCacheHandler;
57     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
58     private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionIdAndEventType =
59             new ConcurrentHashMap<>();
60
61     /**
62      * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
63      * Event compliant.
64      *
65      * @param subscriptionId   Cm Subscription Id
66      * @param eventType        Type of event
67      * @param ncmpOutEvent     Cm Notification Subscription Event for the
68      *                         client
69      * @param isScheduledEvent Determines if the event is to be scheduled
70      *                         or published now
71      */
72     public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
73             final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
74
75         final String taskKey = subscriptionId.concat(eventType);
76
77         if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) {
78             final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType);
79             scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture);
80             log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId,
81                     eventType);
82         } else {
83             cancelScheduledTask(taskKey);
84             if (ncmpOutEvent != null) {
85                 publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
86                 log.debug("Published Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
87                         subscriptionId, eventType);
88             }
89         }
90     }
91
92     /**
93      * Get an NCMP out event as cloud event.
94      *
95      * @param subscriptionId   subscription id
96      * @param eventType        event type
97      * @param ncmpOutEvent     cm notification subscription NCMP out event
98      * @return cm notification subscription NCMP out event as cloud event
99      */
100     public static CloudEvent buildAndGetNcmpOutEventAsCloudEvent(
101             final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) {
102
103         return NcmpEvent.builder()
104                        .type(eventType)
105                        .dataSchema(SUBSCRIPTIONS_V1.getDataSchema())
106                        .extensions(Map.of("correlationid", subscriptionId))
107                        .data(ncmpOutEvent)
108                        .build()
109                        .asCloudEvent();
110     }
111
112     private ScheduledFuture<?> scheduleAndPublishNcmpOutEvent(final String subscriptionId, final String eventType) {
113         final NcmpOutEventPublishingTask ncmpOutEventPublishingTask =
114                 new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsPublisher,
115                         ncmpOutEventMapper, dmiCacheHandler);
116         return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs,
117                 TimeUnit.MILLISECONDS);
118     }
119
120     private void cancelScheduledTask(final String taskKey) {
121         final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionIdAndEventType.get(taskKey);
122         if (scheduledFuture != null) {
123             scheduledFuture.cancel(true);
124             scheduledTasksPerSubscriptionIdAndEventType.remove(taskKey);
125         }
126     }
127
128     private void publishNcmpOutEventNow(final String subscriptionId, final String eventType,
129             final NcmpOutEvent ncmpOutEvent) {
130         final CloudEvent ncmpOutEventAsCloudEvent =
131                 buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent);
132         eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
133         dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
134     }
135
136 }