2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
4 * Modifications Copyright (C) 2024 TechMahindra Ltd.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.cps.ncmp.impl.datajobs.subscription.utils;
24 import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
25 import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.UNKNOWN;
27 import java.io.Serializable;
28 import java.time.OffsetDateTime;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
34 import lombok.RequiredArgsConstructor;
35 import lombok.extern.slf4j.Slf4j;
36 import org.onap.cps.api.CpsDataService;
37 import org.onap.cps.api.CpsQueryService;
38 import org.onap.cps.api.model.DataNode;
39 import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
40 import org.onap.cps.utils.ContentType;
41 import org.onap.cps.utils.JsonObjectMapper;
42 import org.springframework.stereotype.Service;
46 @RequiredArgsConstructor
47 public class CmDataJobSubscriptionPersistenceService {
49 private static final String DATASPACE = "NCMP-Admin";
50 private static final String ANCHOR = "cm-data-job-subscriptions";
52 private static final String PARENT_NODE_XPATH = "/dataJob";
53 private static final String CPS_PATH_FOR_SUBSCRIPTION_NODE = "//subscription";
54 private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR =
55 CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@dataNodeSelector='%s']";
56 private static final String CPS_PATH_FOR_SUBSCRIPTION_WITH_DATA_NODE_SELECTOR =
57 "/dataJob/subscription[@dataNodeSelector='%s']";
58 private static final String CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID =
59 CPS_PATH_FOR_SUBSCRIPTION_NODE + "/dataJobId[text()='%s']";
60 private static final String CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS =
61 CPS_PATH_FOR_SUBSCRIPTION_NODE + "[@status='UNKNOWN' or @status='REJECTED']/dataJobId[text()='%s']";
63 private final JsonObjectMapper jsonObjectMapper;
64 private final CpsQueryService cpsQueryService;
65 private final CpsDataService cpsDataService;
68 * Check if we have a cm data job subscription for the given data node selector.
70 * @param dataNodeSelector the target of the data job subscription
71 * @return true if the subscription details has at least one subscriber , otherwise false
73 public boolean hasAtLeastOneSubscription(final String dataNodeSelector) {
74 return !getSubscriptionIds(dataNodeSelector).isEmpty();
78 * Check if the input is a new subscription ID against ongoing subscriptions.
80 * @param subscriptionId subscription ID
81 * @return true if subscriptionId is not used in active subscriptions, otherwise false
83 public boolean isNewSubscriptionId(final String subscriptionId) {
84 final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
85 return cpsQueryService.queryDataNodes(DATASPACE, ANCHOR,
86 query, OMIT_DESCENDANTS).isEmpty();
90 * Get the ids for the subscriptions for the given data node selector.
92 * @param dataNodeSelector the target of the data job subscription
93 * @return collection of subscription ids of ongoing cm notification subscription
95 @SuppressWarnings("unchecked")
96 public Collection<String> getSubscriptionIds(final String dataNodeSelector) {
97 final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
98 final Collection<DataNode> existingNodes =
99 cpsQueryService.queryDataNodes(DATASPACE, ANCHOR,
100 query, OMIT_DESCENDANTS);
101 if (existingNodes.isEmpty()) {
102 return Collections.emptyList();
104 return (Collection<String>) existingNodes.iterator().next().getLeaves().get("dataJobId");
108 * Get data node selectors for subscriptions with subscription ID.
110 * @param subscriptionId subscription ID
111 * @return a list of dataNodeSelectors, or empty list if none found
113 public Collection<String> getDataNodeSelectors(final String subscriptionId) {
114 final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTION_WITH_DATA_JOB_ID.formatted(subscriptionId);
115 final Collection<DataNode> dataNodes =
116 cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, OMIT_DESCENDANTS);
117 final List<String> dataNodeSelectors = new ArrayList<>();
118 for (final DataNode dataNode : dataNodes) {
119 final String dataNodeSelector = dataNode.getLeaves().get("dataNodeSelector").toString();
120 dataNodeSelectors.add(dataNodeSelector);
122 return dataNodeSelectors;
126 * Remove cm notification data job subscription.
128 * @param subscriptionId data job subscription id to be deleted
129 * @param dataNodeSelector the target of the data job subscription
131 public void delete(final String subscriptionId, final String dataNodeSelector) {
132 final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
133 final Collection<DataNode> dataNodes =
134 cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query, OMIT_DESCENDANTS);
135 final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
136 if (!subscriptionIds.remove(subscriptionId)) {
137 log.warn("SubscriptionId={} not found under {}={}", subscriptionId, "dataNodeSelector", dataNodeSelector);
140 if (subscriptionIds.isEmpty()) {
141 deleteEntireSubscription(dataNodeSelector);
143 final String currentStatus = dataNodes.iterator().next().getLeaves().get("status").toString();
144 updateSubscriptionDetails(dataNodeSelector, subscriptionIds, currentStatus);
149 * Delete the entire subscription.
151 * @param dataNodeSelector data node selector
153 public void deleteEntireSubscription(final String dataNodeSelector) {
154 final String query = CPS_PATH_FOR_SUBSCRIPTION_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
155 cpsDataService.deleteDataNode(DATASPACE, ANCHOR, query, OffsetDateTime.now());
159 * Get data node selectors for subscriptions with status UNKNOWN or REJECTED.
161 * @param subscriptionId subscription ID
162 * @return a list of data node selectors
164 public List<String> getInactiveDataNodeSelectors(final String subscriptionId) {
165 final String query = CPS_PATH_TEMPLATE_FOR_INACTIVE_SUBSCRIPTIONS.formatted(subscriptionId);
166 final Collection<DataNode> dataNodes = cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query,
168 final List<String> dataNodeSelectors = new ArrayList<>(dataNodes.size());
169 for (final DataNode dataNode : dataNodes) {
170 final String dataNodeSelector = dataNode.getLeaves().get("dataNodeSelector").toString();
171 dataNodeSelectors.add(dataNodeSelector);
173 return dataNodeSelectors;
177 * Add cm notification data job subscription.
179 * @param subscriptionId data job subscription id to be added
180 * @param dataNodeSelector the target of the data job subscription
182 public void add(final String subscriptionId, final String dataNodeSelector) {
183 final String query = CPS_PATH_TEMPLATE_FOR_SUBSCRIPTIONS_WITH_DATA_NODE_SELECTOR.formatted(dataNodeSelector);
184 final Collection<DataNode> dataNodes = cpsQueryService.queryDataNodes(DATASPACE, ANCHOR, query,
186 if (dataNodes.isEmpty()) {
187 addNewSubscriptionDetails(subscriptionId, dataNodeSelector);
189 final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
190 final String cmSubscriptionStatusName = dataNodes.iterator().next().getLeaves().get("status").toString();
191 subscriptionIds.add(subscriptionId);
192 updateSubscriptionDetails(dataNodeSelector, subscriptionIds, cmSubscriptionStatusName);
197 * Update status of a subscription.
199 * @param dataNodeSelector data node selector
200 * @param cmSubscriptionStatus cm subscription status
202 public void updateCmSubscriptionStatus(final String dataNodeSelector,
203 final CmSubscriptionStatus cmSubscriptionStatus) {
204 final Collection<String> subscriptionIds = getSubscriptionIds(dataNodeSelector);
205 updateSubscriptionDetails(dataNodeSelector, subscriptionIds, cmSubscriptionStatus.name());
208 private void addNewSubscriptionDetails(final String subscriptionId,
209 final String dataNodeSelector) {
210 final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
211 final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
212 newSubscriptionList, UNKNOWN.name());
213 cpsDataService.saveData(DATASPACE, ANCHOR, PARENT_NODE_XPATH, subscriptionDetailsAsJson,
214 OffsetDateTime.now(), ContentType.JSON);
217 private void updateSubscriptionDetails(final String dataNodeSelector, final Collection<String> subscriptionIds,
218 final String cmSubscriptionStatusName) {
219 final String subscriptionDetailsAsJson = createSubscriptionDetailsAsJson(dataNodeSelector,
220 subscriptionIds, cmSubscriptionStatusName);
221 cpsDataService.updateNodeLeaves(DATASPACE, ANCHOR, PARENT_NODE_XPATH, subscriptionDetailsAsJson,
222 OffsetDateTime.now(),
226 private String createSubscriptionDetailsAsJson(final String dataNodeSelector,
227 final Collection<String> subscriptionIds,
228 final String cmSubscriptionStatusName) {
229 final Map<String, Serializable> subscriptionDetailsAsMap =
230 Map.of("dataNodeSelector", dataNodeSelector,
231 "dataJobId", (Serializable) subscriptionIds,
232 "status", cmSubscriptionStatusName);
233 return "{\"subscription\":[" + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap) + "]}";