Merge "Update write performance test timings"
[cps.git] / cps-service / src / main / java / org / onap / cps / api / impl / CpsDataServiceImpl.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2021-2023 Nordix Foundation
4  *  Modifications Copyright (C) 2020-2022 Bell Canada.
5  *  Modifications Copyright (C) 2021 Pantheon.tech
6  *  Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7  *  Modifications Copyright (C) 2022 Deutsche Telekom AG
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *
13  *        http://www.apache.org/licenses/LICENSE-2.0
14  *
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *
21  *  SPDX-License-Identifier: Apache-2.0
22  *  ============LICENSE_END=========================================================
23  */
24
25 package org.onap.cps.api.impl;
26
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
30
31 import io.micrometer.core.annotation.Timed;
32 import java.io.Serializable;
33 import java.time.OffsetDateTime;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.Map;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.notification.NotificationService;
43 import org.onap.cps.notification.Operation;
44 import org.onap.cps.spi.CpsDataPersistenceService;
45 import org.onap.cps.spi.FetchDescendantsOption;
46 import org.onap.cps.spi.exceptions.DataValidationException;
47 import org.onap.cps.spi.model.Anchor;
48 import org.onap.cps.spi.model.DataNode;
49 import org.onap.cps.spi.model.DataNodeBuilder;
50 import org.onap.cps.spi.utils.CpsValidator;
51 import org.onap.cps.utils.ContentType;
52 import org.onap.cps.utils.TimedYangParser;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.springframework.stereotype.Service;
56
57 @Service
58 @Slf4j
59 @RequiredArgsConstructor
60 public class CpsDataServiceImpl implements CpsDataService {
61
62     private static final String ROOT_NODE_XPATH = "/";
63     private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
64
65     private final CpsDataPersistenceService cpsDataPersistenceService;
66     private final CpsAdminService cpsAdminService;
67     private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
68     private final NotificationService notificationService;
69     private final CpsValidator cpsValidator;
70     private final TimedYangParser timedYangParser;
71
72     @Override
73     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
74         final OffsetDateTime observedTimestamp) {
75         saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
76     }
77
78     @Override
79     @Timed(value = "cps.data.service.datanode.root.save",
80         description = "Time taken to save a root data node")
81     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
82                          final OffsetDateTime observedTimestamp, final ContentType contentType) {
83         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
84         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
85         final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
86         cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
87         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
88     }
89
90     @Override
91     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
92                          final String nodeData, final OffsetDateTime observedTimestamp) {
93         saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
94     }
95
96     @Override
97     @Timed(value = "cps.data.service.datanode.child.save",
98         description = "Time taken to save a child data node")
99     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
100                          final String nodeData, final OffsetDateTime observedTimestamp,
101                          final ContentType contentType) {
102         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
103         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
104         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
105         cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
106         processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
107     }
108
109     @Override
110     @Timed(value = "cps.data.service.list.element.save",
111         description = "Time taken to save a list element")
112     public void saveListElements(final String dataspaceName, final String anchorName,
113         final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
114         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
115         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
116         final Collection<DataNode> listElementDataNodeCollection =
117             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
118         cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
119             listElementDataNodeCollection);
120         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
121     }
122
123     @Override
124     @Timed(value = "cps.data.service.list.element.batch.save",
125         description = "Time taken to save a batch of list elements")
126     public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
127             final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
128         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
129         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
130         final Collection<Collection<DataNode>> listElementDataNodeCollections =
131                 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
132         cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
133                 listElementDataNodeCollections);
134         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
135     }
136
137     @Override
138     @Timed(value = "cps.data.service.datanode.get",
139             description = "Time taken to get data nodes for an xpath")
140     public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
141                                              final String xpath,
142                                              final FetchDescendantsOption fetchDescendantsOption) {
143         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
144         return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
145     }
146
147     @Override
148     @Timed(value = "cps.data.service.datanode.batch.get",
149         description = "Time taken to get a batch of data nodes")
150     public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
151                                                               final Collection<String> xpaths,
152                                                               final FetchDescendantsOption fetchDescendantsOption) {
153         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
154         return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
155                 fetchDescendantsOption);
156     }
157
158     @Override
159     @Timed(value = "cps.data.service.datanode.leaves.update",
160         description = "Time taken to update a batch of leaf data nodes")
161     public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
162         final String jsonData, final OffsetDateTime observedTimestamp) {
163         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
164         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
165         final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
166                 ContentType.JSON);
167         final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
168                 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
169         cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
170         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
171     }
172
173     @Override
174     @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
175         description = "Time taken to update data node leaves and existing descendants leaves")
176     public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
177         final String parentNodeXpath,
178         final String dataNodeUpdatesAsJson,
179         final OffsetDateTime observedTimestamp) {
180         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
181         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
182         final Collection<DataNode> dataNodeUpdates =
183             buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
184         for (final DataNode dataNodeUpdate : dataNodeUpdates) {
185             processDataNodeUpdate(anchor, dataNodeUpdate);
186         }
187         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
188     }
189
190     @Override
191     public String startSession() {
192         return cpsDataPersistenceService.startSession();
193     }
194
195     @Override
196     public void closeSession(final String sessionId) {
197         cpsDataPersistenceService.closeSession(sessionId);
198     }
199
200     @Override
201     public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
202         lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
203     }
204
205     @Override
206     public void lockAnchor(final String sessionID, final String dataspaceName,
207                            final String anchorName, final Long timeoutInMilliseconds) {
208         cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
209     }
210
211     @Override
212     @Timed(value = "cps.data.service.datanode.descendants.update",
213         description = "Time taken to update a data node and descendants")
214     public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
215                                              final String parentNodeXpath, final String jsonData,
216                                              final OffsetDateTime observedTimestamp) {
217         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
218         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
219         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
220         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
221         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
222     }
223
224     @Override
225     @Timed(value = "cps.data.service.datanode.descendants.batch.update",
226         description = "Time taken to update a batch of data nodes and descendants")
227     public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
228                                               final Map<String, String> nodesJsonData,
229                                               final OffsetDateTime observedTimestamp) {
230         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
231         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
232         final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
233         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
234         nodesJsonData.keySet().forEach(nodeXpath ->
235             processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
236     }
237
238     @Override
239     @Timed(value = "cps.data.service.list.update",
240         description = "Time taken to update a list")
241     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
242             final String jsonData, final OffsetDateTime observedTimestamp) {
243         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
244         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
245         final Collection<DataNode> newListElements =
246             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
247         replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
248     }
249
250     @Override
251     @Timed(value = "cps.data.service.list.batch.update",
252         description = "Time taken to update a batch of lists")
253     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
254             final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
255         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
256         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
257         cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
258         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
259     }
260
261     @Override
262     @Timed(value = "cps.data.service.datanode.delete",
263         description = "Time taken to delete a datanode")
264     public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
265                                final OffsetDateTime observedTimestamp) {
266         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
267         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
268         cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
269         processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
270     }
271
272     @Override
273     @Timed(value = "cps.data.service.datanode.batch.delete",
274         description = "Time taken to delete a batch of datanodes")
275     public void deleteDataNodes(final String dataspaceName, final String anchorName,
276                                 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
277         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
278         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
279         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
280         dataNodeXpaths.forEach(dataNodeXpath ->
281             processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
282     }
283
284     @Override
285     @Timed(value = "cps.data.service.datanode.delete.anchor",
286         description = "Time taken to delete all datanodes for an anchor")
287     public void deleteDataNodes(final String dataspaceName, final String anchorName,
288                                 final OffsetDateTime observedTimestamp) {
289         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
290         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
291         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
292         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
293     }
294
295     @Override
296     @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
297         description = "Time taken to delete all datanodes for multiple anchors")
298     public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
299                                 final OffsetDateTime observedTimestamp) {
300         cpsValidator.validateNameCharacters(dataspaceName);
301         cpsValidator.validateNameCharacters(anchorNames);
302         for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
303             processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
304         }
305         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
306     }
307
308     @Override
309     @Timed(value = "cps.data.service.list.delete",
310         description = "Time taken to delete a list or list element")
311     public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
312         final OffsetDateTime observedTimestamp) {
313         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
314         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
315         cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
316         processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
317     }
318
319     private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
320                                    final ContentType contentType) {
321         final SchemaContext schemaContext = getSchemaContext(anchor);
322
323         if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
324             final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
325             return new DataNodeBuilder().withContainerNode(containerNode).build();
326         }
327
328         final ContainerNode containerNode =
329             timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
330
331         return new DataNodeBuilder()
332                 .withParentNodeXpath(parentNodeXpath)
333                 .withContainerNode(containerNode)
334                 .build();
335     }
336
337     private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
338         return nodesJsonData.entrySet().stream().map(nodeJsonData ->
339             buildDataNode(anchor, nodeJsonData.getKey(),
340                 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
341     }
342
343     private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
344                                                 final String nodeData, final ContentType contentType) {
345         final SchemaContext schemaContext = getSchemaContext(anchor);
346
347         if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
348             final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
349             final Collection<DataNode> dataNodes = new DataNodeBuilder()
350                     .withContainerNode(containerNode)
351                     .buildCollection();
352             if (dataNodes.isEmpty()) {
353                 throw new DataValidationException("Invalid data.", "No data nodes provided");
354             }
355             return dataNodes;
356         }
357         final ContainerNode containerNode =
358             timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
359         final Collection<DataNode> dataNodes = new DataNodeBuilder()
360             .withParentNodeXpath(parentNodeXpath)
361             .withContainerNode(containerNode)
362             .buildCollection();
363         if (dataNodes.isEmpty()) {
364             throw new DataValidationException("Invalid data.", "No data nodes provided");
365         }
366         return dataNodes;
367     }
368
369     private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
370                                                             final Collection<String> nodeDataList,
371                                                             final ContentType contentType) {
372         return nodeDataList.stream()
373             .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
374             .collect(Collectors.toList());
375     }
376
377     private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
378                                               final Operation operation, final OffsetDateTime observedTimestamp) {
379         try {
380             notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
381         } catch (final Exception exception) {
382             //If async message can't be queued for notification service, the initial request should not failed.
383             log.error("Failed to send message to notification service", exception);
384         }
385     }
386
387     private SchemaContext getSchemaContext(final Anchor anchor) {
388         return yangTextSchemaSourceSetCache
389             .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
390     }
391
392     private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
393         if (dataNodeUpdate == null) {
394             return;
395         }
396         cpsDataPersistenceService.batchUpdateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
397                 Collections.singletonMap(dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves()));
398         final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
399         for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
400             processDataNodeUpdate(anchor, childDataNodeUpdate);
401         }
402     }
403
404 }