85ea1c6be33e88c0a87d140fd73b2035f6e27c3c
[cps.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
22
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.regex.Matcher;
31 import java.util.regex.Pattern;
32 import java.util.stream.Collectors;
33 import lombok.RequiredArgsConstructor;
34 import org.onap.cps.api.model.DataNode;
35 import org.onap.cps.ncmp.impl.datajobs.subscription.cache.DmiCacheHandler;
36 import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.Predicate;
37 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiCmSubscriptionDetailsPerDmiMapper;
38 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper;
39 import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventProducer;
40 import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
41 import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionDetails;
42 import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionKey;
43 import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionPredicate;
44 import org.onap.cps.ncmp.impl.datajobs.subscription.models.DmiCmSubscriptionTuple;
45 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_client.NcmpOutEvent;
46 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DmiInEvent;
47 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
48 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
49 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
50 import org.springframework.stereotype.Service;
51
52 @Service
53 @RequiredArgsConstructor
54 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
55 public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
56
57     private static final Pattern SUBSCRIPTION_KEY_FROM_XPATH_PATTERN = Pattern.compile(
58         "^/datastores/datastore\\[@name='([^']*)']/cm-handles/cm-handle\\[@id='([^']*)']/"
59             + "filters/filter\\[@xpath='(.*)']$");
60
61     private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
62     private final CmSubscriptionComparator cmSubscriptionComparator;
63     private final NcmpOutEventMapper ncmpOutEventMapper;
64     private final DmiInEventMapper dmiInEventMapper;
65     private final DmiCmSubscriptionDetailsPerDmiMapper dmiCmSubscriptionDetailsPerDmiMapper;
66     private final NcmpOutEventProducer ncmpOutEventProducer;
67     private final DmiInEventProducer dmiInEventProducer;
68     private final DmiCacheHandler dmiCacheHandler;
69     private final InventoryPersistence inventoryPersistence;
70
71     @Override
72     public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
73         if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) {
74             dmiCacheHandler.add(subscriptionId, predicates);
75             handleNewCmSubscription(subscriptionId);
76             scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
77         } else {
78             rejectAndSendCreateRequest(subscriptionId, predicates);
79         }
80     }
81
82     @Override
83     public void processSubscriptionDeleteRequest(final String subscriptionId) {
84         final Collection<DataNode> subscriptionDataNodes =
85             cmDataJobSubscriptionPersistenceService.getAffectedDataNodes(subscriptionId);
86         final DmiCmSubscriptionTuple dmiCmSubscriptionTuple =
87             getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
88         dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
89         if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) {
90             acceptAndSendDeleteRequest(subscriptionId);
91         } else {
92             sendSubscriptionDeleteRequestToDmi(subscriptionId,
93                 dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
94                     dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi()));
95             scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
96         }
97     }
98
99     private Map<String, DmiCmSubscriptionDetails> mergeDmiCmSubscriptionDetailsPerDmiMaps(
100         final DmiCmSubscriptionTuple dmiCmSubscriptionTuple) {
101         final Map<String, DmiCmSubscriptionDetails> lastRemainingDmiSubscriptionsPerDmi =
102             dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
103                 dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi());
104         final Map<String, DmiCmSubscriptionDetails> overlappingDmiSubscriptionsPerDmi =
105             dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
106                 dmiCmSubscriptionTuple.overlappingSubscriptionsPerDmi());
107         final Map<String, DmiCmSubscriptionDetails> mergedDmiSubscriptionsPerDmi =
108             new HashMap<>(lastRemainingDmiSubscriptionsPerDmi);
109         overlappingDmiSubscriptionsPerDmi.forEach((dmiServiceName, dmiCmSubscriptionDetails) ->
110             mergedDmiSubscriptionsPerDmi.merge(dmiServiceName, dmiCmSubscriptionDetails,
111                 this::mergeDmiCmSubscriptionDetails));
112         return mergedDmiSubscriptionsPerDmi;
113     }
114
115     private DmiCmSubscriptionDetails mergeDmiCmSubscriptionDetails(
116         final DmiCmSubscriptionDetails dmiCmSubscriptionDetails,
117         final DmiCmSubscriptionDetails otherDmiCmSubscriptionDetails) {
118         final List<DmiCmSubscriptionPredicate> mergedDmiCmSubscriptionPredicates =
119             new ArrayList<>(dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
120         mergedDmiCmSubscriptionPredicates.addAll(otherDmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
121         return new DmiCmSubscriptionDetails(mergedDmiCmSubscriptionPredicates, CmSubscriptionStatus.PENDING);
122     }
123
124     private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
125         ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, null, true);
126     }
127
128     private void rejectAndSendCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
129         final Set<String> subscriptionTargetFilters =
130             predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
131                 .collect(Collectors.toSet());
132         final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(subscriptionId,
133             new ArrayList<>(subscriptionTargetFilters));
134         ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
135     }
136
137     private void acceptAndSendDeleteRequest(final String subscriptionId) {
138         final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet();
139         for (final String dmiServiceName : dmiServiceNames) {
140             dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName,
141                 CmSubscriptionStatus.ACCEPTED);
142             dmiCacheHandler.removeFromDatabase(subscriptionId, dmiServiceName);
143         }
144         final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
145             dmiCacheHandler.get(subscriptionId));
146         ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
147             false);
148     }
149
150     private void handleNewCmSubscription(final String subscriptionId) {
151         final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
152             dmiCacheHandler.get(subscriptionId);
153         dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
154             final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
155                 cmSubscriptionComparator.getNewDmiSubscriptionPredicates(
156                     dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
157
158             if (dmiCmSubscriptionPredicates.isEmpty()) {
159                 acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName);
160             } else {
161                 sendDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
162             }
163         });
164     }
165
166     private void sendDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
167                                       final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
168         final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
169         dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName,
170             "subscriptionCreateRequest", dmiInEvent);
171     }
172
173     private void acceptAndPersistCmSubscriptionPerDmi(final String subscriptionId, final String dmiPluginName) {
174         dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
175             CmSubscriptionStatus.ACCEPTED);
176         dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
177     }
178
179     private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId,
180                                                     final Map<String, DmiCmSubscriptionDetails>
181                                                         dmiCmSubscriptionsPerDmi) {
182         dmiCmSubscriptionsPerDmi.forEach((dmiPluginName, dmiCmSubscriptionDetails) -> {
183             final DmiInEvent dmiInEvent =
184                 dmiInEventMapper.toDmiInEvent(
185                     dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
186             dmiInEventProducer.sendDmiInEvent(subscriptionId,
187                 dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
188         });
189     }
190
191
192     private DmiCmSubscriptionTuple getLastRemainingAndOverlappingSubscriptionsPerDmi(
193         final Collection<DataNode> subscriptionNodes) {
194         final Map<String, Collection<DmiCmSubscriptionKey>> lastRemainingSubscriptionsPerDmi = new HashMap<>();
195         final Map<String, Collection<DmiCmSubscriptionKey>> overlappingSubscriptionsPerDmi = new HashMap<>();
196
197         for (final DataNode subscriptionNode : subscriptionNodes) {
198             final DmiCmSubscriptionKey dmiCmSubscriptionKey = extractCmSubscriptionKey(subscriptionNode.getXpath());
199             final String dmiServiceName = inventoryPersistence.getYangModelCmHandle(
200                 dmiCmSubscriptionKey.cmHandleId()).getDmiServiceName();
201             @SuppressWarnings("unchecked") final List<String> subscribers =
202                 (List<String>) subscriptionNode.getLeaves().get("subscriptionIds");
203             populateDmiCmSubscriptionTuple(subscribers, overlappingSubscriptionsPerDmi,
204                 lastRemainingSubscriptionsPerDmi, dmiServiceName, dmiCmSubscriptionKey);
205         }
206         return new DmiCmSubscriptionTuple(lastRemainingSubscriptionsPerDmi, overlappingSubscriptionsPerDmi);
207     }
208
209     private static void populateDmiCmSubscriptionTuple(final List<String> subscribers,
210                                                        final Map<String, Collection<DmiCmSubscriptionKey>>
211                                                            overlappingSubscriptionsPerDmi,
212                                                        final Map<String, Collection<DmiCmSubscriptionKey>>
213                                                            lastRemainingSubscriptionsPerDmi,
214                                                        final String dmiServiceName,
215                                                        final DmiCmSubscriptionKey dmiCmSubscriptionKey) {
216         final Map<String, Collection<DmiCmSubscriptionKey>> targetMap =
217             subscribers.size() > 1 ? overlappingSubscriptionsPerDmi : lastRemainingSubscriptionsPerDmi;
218         targetMap.computeIfAbsent(dmiServiceName, dmiName -> new HashSet<>()).add(dmiCmSubscriptionKey);
219     }
220
221     private DmiCmSubscriptionKey extractCmSubscriptionKey(final String xpath) {
222         final Matcher matcher = SUBSCRIPTION_KEY_FROM_XPATH_PATTERN.matcher(xpath);
223         if (matcher.find()) {
224             final String datastoreName = matcher.group(1);
225             final String cmHandleId = matcher.group(2);
226             final String filterXpath = matcher.group(3);
227             return new DmiCmSubscriptionKey(datastoreName, cmHandleId, filterXpath);
228         }
229         throw new IllegalArgumentException("DataNode xpath does not represent a subscription key");
230     }
231
232 }