Merge "Forward Subscription Information to DMI Plugin(s)"
[cps.git] / cps-service / src / main / java / org / onap / cps / api / impl / CpsDataServiceImpl.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2021-2023 Nordix Foundation
4  *  Modifications Copyright (C) 2020-2022 Bell Canada.
5  *  Modifications Copyright (C) 2021 Pantheon.tech
6  *  Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7  *  Modifications Copyright (C) 2022 Deutsche Telekom AG
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *
13  *        http://www.apache.org/licenses/LICENSE-2.0
14  *
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *
21  *  SPDX-License-Identifier: Apache-2.0
22  *  ============LICENSE_END=========================================================
23  */
24
25 package org.onap.cps.api.impl;
26
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
30
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.Collection;
34 import java.util.Map;
35 import java.util.stream.Collectors;
36 import lombok.RequiredArgsConstructor;
37 import lombok.extern.slf4j.Slf4j;
38 import org.onap.cps.api.CpsAdminService;
39 import org.onap.cps.api.CpsDataService;
40 import org.onap.cps.notification.NotificationService;
41 import org.onap.cps.notification.Operation;
42 import org.onap.cps.spi.CpsDataPersistenceService;
43 import org.onap.cps.spi.FetchDescendantsOption;
44 import org.onap.cps.spi.exceptions.DataValidationException;
45 import org.onap.cps.spi.model.Anchor;
46 import org.onap.cps.spi.model.DataNode;
47 import org.onap.cps.spi.model.DataNodeBuilder;
48 import org.onap.cps.spi.utils.CpsValidator;
49 import org.onap.cps.utils.ContentType;
50 import org.onap.cps.utils.TimedYangParser;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.springframework.stereotype.Service;
54
55 @Service
56 @Slf4j
57 @RequiredArgsConstructor
58 public class CpsDataServiceImpl implements CpsDataService {
59
60     private static final String ROOT_NODE_XPATH = "/";
61     private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
62
63     private final CpsDataPersistenceService cpsDataPersistenceService;
64     private final CpsAdminService cpsAdminService;
65     private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
66     private final NotificationService notificationService;
67     private final CpsValidator cpsValidator;
68     private final TimedYangParser timedYangParser;
69
70     @Override
71     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
72         final OffsetDateTime observedTimestamp) {
73         saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
74     }
75
76     @Override
77     @Timed(value = "cps.data.service.datanode.root.save",
78         description = "Time taken to save a root data node")
79     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
80                          final OffsetDateTime observedTimestamp, final ContentType contentType) {
81         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
82         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
83         final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
84         cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
85         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
86     }
87
88     @Override
89     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
90                          final String nodeData, final OffsetDateTime observedTimestamp) {
91         saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
92     }
93
94     @Override
95     @Timed(value = "cps.data.service.datanode.child.save",
96         description = "Time taken to save a child data node")
97     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
98                          final String nodeData, final OffsetDateTime observedTimestamp,
99                          final ContentType contentType) {
100         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
101         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
102         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
103         cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
104         processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
105     }
106
107     @Override
108     @Timed(value = "cps.data.service.list.element.save",
109         description = "Time taken to save a list element")
110     public void saveListElements(final String dataspaceName, final String anchorName,
111         final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
112         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
113         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
114         final Collection<DataNode> listElementDataNodeCollection =
115             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
116         cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
117             listElementDataNodeCollection);
118         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
119     }
120
121     @Override
122     @Timed(value = "cps.data.service.list.element.batch.save",
123         description = "Time taken to save a batch of list elements")
124     public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
125             final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
126         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
127         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
128         final Collection<Collection<DataNode>> listElementDataNodeCollections =
129                 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
130         cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131                 listElementDataNodeCollections);
132         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
133     }
134
135     @Override
136     @Timed(value = "cps.data.service.datanode.get",
137             description = "Time taken to get data nodes for an xpath")
138     public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
139                                              final String xpath,
140                                              final FetchDescendantsOption fetchDescendantsOption) {
141         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
142         return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
143     }
144
145     @Override
146     @Timed(value = "cps.data.service.datanode.batch.get",
147         description = "Time taken to get a batch of data nodes")
148     public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
149                                                               final Collection<String> xpaths,
150                                                               final FetchDescendantsOption fetchDescendantsOption) {
151         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
152         return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
153                 fetchDescendantsOption);
154     }
155
156     @Override
157     @Timed(value = "cps.data.service.datanode.leaves.update",
158         description = "Time taken to get a batch of data nodes")
159     public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
160         final String jsonData, final OffsetDateTime observedTimestamp) {
161         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
162         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
163         final DataNode dataNode = buildDataNode(anchor, parentNodeXpath, jsonData, ContentType.JSON);
164         cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(),
165             dataNode.getLeaves());
166         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
167     }
168
169     @Override
170     @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
171         description = "Time taken to update data node leaves and existing descendants leaves")
172     public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
173         final String parentNodeXpath,
174         final String dataNodeUpdatesAsJson,
175         final OffsetDateTime observedTimestamp) {
176         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
177         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
178         final Collection<DataNode> dataNodeUpdates =
179             buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
180         for (final DataNode dataNodeUpdate : dataNodeUpdates) {
181             processDataNodeUpdate(anchor, dataNodeUpdate);
182         }
183         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
184     }
185
186     @Override
187     public String startSession() {
188         return cpsDataPersistenceService.startSession();
189     }
190
191     @Override
192     public void closeSession(final String sessionId) {
193         cpsDataPersistenceService.closeSession(sessionId);
194     }
195
196     @Override
197     public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
198         lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
199     }
200
201     @Override
202     public void lockAnchor(final String sessionID, final String dataspaceName,
203                            final String anchorName, final Long timeoutInMilliseconds) {
204         cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
205     }
206
207     @Override
208     @Timed(value = "cps.data.service.datanode.descendants.update",
209         description = "Time taken to update a data node and descendants")
210     public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
211                                              final String parentNodeXpath, final String jsonData,
212                                              final OffsetDateTime observedTimestamp) {
213         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
214         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
215         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
216         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
217         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
218     }
219
220     @Override
221     @Timed(value = "cps.data.service.datanode.descendants.batch.update",
222         description = "Time taken to update a batch of data nodes and descendants")
223     public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
224                                               final Map<String, String> nodesJsonData,
225                                               final OffsetDateTime observedTimestamp) {
226         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
227         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
228         final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
229         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
230         nodesJsonData.keySet().forEach(nodeXpath ->
231             processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
232     }
233
234     @Override
235     @Timed(value = "cps.data.service.list.update",
236         description = "Time taken to update a list")
237     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
238             final String jsonData, final OffsetDateTime observedTimestamp) {
239         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
240         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
241         final Collection<DataNode> newListElements =
242             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
243         replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
244     }
245
246     @Override
247     @Timed(value = "cps.data.service.list.batch.update",
248         description = "Time taken to update a batch of lists")
249     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
250             final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
251         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
252         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
253         cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
254         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
255     }
256
257     @Override
258     @Timed(value = "cps.data.service.datanode.delete",
259         description = "Time taken to delete a datanode")
260     public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
261                                final OffsetDateTime observedTimestamp) {
262         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
263         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
264         cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
265         processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
266     }
267
268     @Override
269     @Timed(value = "cps.data.service.datanode.batch.delete",
270         description = "Time taken to delete a batch of datanodes")
271     public void deleteDataNodes(final String dataspaceName, final String anchorName,
272                                 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
273         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
274         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
275         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
276         dataNodeXpaths.forEach(dataNodeXpath ->
277             processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
278     }
279
280     @Override
281     @Timed(value = "cps.data.service.datanode.delete.anchor",
282         description = "Time taken to delete all datanodes for an anchor")
283     public void deleteDataNodes(final String dataspaceName, final String anchorName,
284                                 final OffsetDateTime observedTimestamp) {
285         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
286         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
287         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
288         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
289     }
290
291     @Override
292     @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
293         description = "Time taken to delete all datanodes for multiple anchors")
294     public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
295                                 final OffsetDateTime observedTimestamp) {
296         cpsValidator.validateNameCharacters(dataspaceName);
297         cpsValidator.validateNameCharacters(anchorNames);
298         for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
299             processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
300         }
301         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
302     }
303
304     @Override
305     @Timed(value = "cps.data.service.list.delete",
306         description = "Time taken to delete a list or list element")
307     public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
308         final OffsetDateTime observedTimestamp) {
309         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
310         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
311         cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
312         processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
313     }
314
315     private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
316                                    final ContentType contentType) {
317         final SchemaContext schemaContext = getSchemaContext(anchor);
318
319         if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
320             final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
321             return new DataNodeBuilder().withContainerNode(containerNode).build();
322         }
323
324         final ContainerNode containerNode =
325             timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
326
327         return new DataNodeBuilder()
328                 .withParentNodeXpath(parentNodeXpath)
329                 .withContainerNode(containerNode)
330                 .build();
331     }
332
333     private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
334         return nodesJsonData.entrySet().stream().map(nodeJsonData ->
335             buildDataNode(anchor, nodeJsonData.getKey(),
336                 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
337     }
338
339     private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
340                                                 final String nodeData, final ContentType contentType) {
341         final SchemaContext schemaContext = getSchemaContext(anchor);
342
343         if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
344             final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
345             final Collection<DataNode> dataNodes = new DataNodeBuilder()
346                     .withContainerNode(containerNode)
347                     .buildCollection();
348             if (dataNodes.isEmpty()) {
349                 throw new DataValidationException("Invalid data.", "No data nodes provided");
350             }
351             return dataNodes;
352         }
353         final ContainerNode containerNode =
354             timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
355         final Collection<DataNode> dataNodes = new DataNodeBuilder()
356             .withParentNodeXpath(parentNodeXpath)
357             .withContainerNode(containerNode)
358             .buildCollection();
359         if (dataNodes.isEmpty()) {
360             throw new DataValidationException("Invalid data.", "No data nodes provided");
361         }
362         return dataNodes;
363     }
364
365     private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
366                                                             final Collection<String> nodeDataList,
367                                                             final ContentType contentType) {
368         return nodeDataList.stream()
369             .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
370             .collect(Collectors.toList());
371     }
372
373     private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
374                                               final Operation operation, final OffsetDateTime observedTimestamp) {
375         try {
376             notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
377         } catch (final Exception exception) {
378             //If async message can't be queued for notification service, the initial request should not failed.
379             log.error("Failed to send message to notification service", exception);
380         }
381     }
382
383     private SchemaContext getSchemaContext(final Anchor anchor) {
384         return yangTextSchemaSourceSetCache
385             .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
386     }
387
388     private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
389         if (dataNodeUpdate == null) {
390             return;
391         }
392         cpsDataPersistenceService.updateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
393             dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves());
394         final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
395         for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
396             processDataNodeUpdate(anchor, childDataNodeUpdate);
397         }
398     }
399
400 }