Merge "Add memory usage to integration tests [UPDATED]"
[cps.git] / cps-service / src / main / java / org / onap / cps / api / impl / CpsDataServiceImpl.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2021-2023 Nordix Foundation
4  *  Modifications Copyright (C) 2020-2022 Bell Canada.
5  *  Modifications Copyright (C) 2021 Pantheon.tech
6  *  Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7  *  Modifications Copyright (C) 2022 Deutsche Telekom AG
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *
13  *        http://www.apache.org/licenses/LICENSE-2.0
14  *
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *
21  *  SPDX-License-Identifier: Apache-2.0
22  *  ============LICENSE_END=========================================================
23  */
24
25 package org.onap.cps.api.impl;
26
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
30
31 import io.micrometer.core.annotation.Timed;
32 import java.io.Serializable;
33 import java.time.OffsetDateTime;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.Map;
38 import java.util.stream.Collectors;
39 import lombok.RequiredArgsConstructor;
40 import lombok.extern.slf4j.Slf4j;
41 import org.onap.cps.api.CpsAdminService;
42 import org.onap.cps.api.CpsDataService;
43 import org.onap.cps.cpspath.parser.CpsPathUtil;
44 import org.onap.cps.notification.NotificationService;
45 import org.onap.cps.notification.Operation;
46 import org.onap.cps.spi.CpsDataPersistenceService;
47 import org.onap.cps.spi.FetchDescendantsOption;
48 import org.onap.cps.spi.exceptions.DataValidationException;
49 import org.onap.cps.spi.model.Anchor;
50 import org.onap.cps.spi.model.DataNode;
51 import org.onap.cps.spi.model.DataNodeBuilder;
52 import org.onap.cps.spi.utils.CpsValidator;
53 import org.onap.cps.utils.ContentType;
54 import org.onap.cps.utils.TimedYangParser;
55 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import org.springframework.stereotype.Service;
58
59 @Service
60 @Slf4j
61 @RequiredArgsConstructor
62 public class CpsDataServiceImpl implements CpsDataService {
63
64     private static final String ROOT_NODE_XPATH = "/";
65     private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
66
67     private final CpsDataPersistenceService cpsDataPersistenceService;
68     private final CpsAdminService cpsAdminService;
69     private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
70     private final NotificationService notificationService;
71     private final CpsValidator cpsValidator;
72     private final TimedYangParser timedYangParser;
73
74     @Override
75     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
76         final OffsetDateTime observedTimestamp) {
77         saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
78     }
79
80     @Override
81     @Timed(value = "cps.data.service.datanode.root.save",
82         description = "Time taken to save a root data node")
83     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
84                          final OffsetDateTime observedTimestamp, final ContentType contentType) {
85         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
86         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
87         final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
88         cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
89         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
90     }
91
92     @Override
93     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
94                          final String nodeData, final OffsetDateTime observedTimestamp) {
95         saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
96     }
97
98     @Override
99     @Timed(value = "cps.data.service.datanode.child.save",
100         description = "Time taken to save a child data node")
101     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
102                          final String nodeData, final OffsetDateTime observedTimestamp,
103                          final ContentType contentType) {
104         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
105         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
106         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
107         cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
108         processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
109     }
110
111     @Override
112     @Timed(value = "cps.data.service.list.element.save",
113         description = "Time taken to save a list element")
114     public void saveListElements(final String dataspaceName, final String anchorName,
115         final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
116         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
117         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
118         final Collection<DataNode> listElementDataNodeCollection =
119             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
120         if (isRootNodeXpath(parentNodeXpath)) {
121             cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, listElementDataNodeCollection);
122         } else {
123             cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
124                                                       listElementDataNodeCollection);
125         }
126         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
127     }
128
129     @Override
130     @Timed(value = "cps.data.service.list.element.batch.save",
131         description = "Time taken to save a batch of list elements")
132     public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
133             final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
134         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
135         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
136         final Collection<Collection<DataNode>> listElementDataNodeCollections =
137                 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
138         cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
139                 listElementDataNodeCollections);
140         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
141     }
142
143     @Override
144     @Timed(value = "cps.data.service.datanode.get",
145             description = "Time taken to get data nodes for an xpath")
146     public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
147                                              final String xpath,
148                                              final FetchDescendantsOption fetchDescendantsOption) {
149         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
150         return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
151     }
152
153     @Override
154     @Timed(value = "cps.data.service.datanode.batch.get",
155         description = "Time taken to get a batch of data nodes")
156     public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
157                                                               final Collection<String> xpaths,
158                                                               final FetchDescendantsOption fetchDescendantsOption) {
159         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
160         return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
161                 fetchDescendantsOption);
162     }
163
164     @Override
165     @Timed(value = "cps.data.service.datanode.leaves.update",
166         description = "Time taken to update a batch of leaf data nodes")
167     public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
168         final String jsonData, final OffsetDateTime observedTimestamp) {
169         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
170         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
171         final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
172                 ContentType.JSON);
173         final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
174                 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
175         cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
176         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
177     }
178
179     @Override
180     @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
181         description = "Time taken to update data node leaves and existing descendants leaves")
182     public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
183         final String parentNodeXpath,
184         final String dataNodeUpdatesAsJson,
185         final OffsetDateTime observedTimestamp) {
186         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
187         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
188         final Collection<DataNode> dataNodeUpdates =
189             buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
190         for (final DataNode dataNodeUpdate : dataNodeUpdates) {
191             processDataNodeUpdate(anchor, dataNodeUpdate);
192         }
193         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
194     }
195
196     @Override
197     public String startSession() {
198         return cpsDataPersistenceService.startSession();
199     }
200
201     @Override
202     public void closeSession(final String sessionId) {
203         cpsDataPersistenceService.closeSession(sessionId);
204     }
205
206     @Override
207     public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
208         lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
209     }
210
211     @Override
212     public void lockAnchor(final String sessionID, final String dataspaceName,
213                            final String anchorName, final Long timeoutInMilliseconds) {
214         cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
215     }
216
217     @Override
218     @Timed(value = "cps.data.service.datanode.descendants.update",
219         description = "Time taken to update a data node and descendants")
220     public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
221                                              final String parentNodeXpath, final String jsonData,
222                                              final OffsetDateTime observedTimestamp) {
223         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
224         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
225         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
226         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
227         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
228     }
229
230     @Override
231     @Timed(value = "cps.data.service.datanode.descendants.batch.update",
232         description = "Time taken to update a batch of data nodes and descendants")
233     public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
234                                               final Map<String, String> nodesJsonData,
235                                               final OffsetDateTime observedTimestamp) {
236         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
237         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
238         final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
239         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
240         nodesJsonData.keySet().forEach(nodeXpath ->
241             processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
242     }
243
244     @Override
245     @Timed(value = "cps.data.service.list.update",
246         description = "Time taken to update a list")
247     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
248             final String jsonData, final OffsetDateTime observedTimestamp) {
249         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
250         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
251         final Collection<DataNode> newListElements =
252             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
253         replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
254     }
255
256     @Override
257     @Timed(value = "cps.data.service.list.batch.update",
258         description = "Time taken to update a batch of lists")
259     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
260             final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
261         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
262         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
263         cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
264         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
265     }
266
267     @Override
268     @Timed(value = "cps.data.service.datanode.delete",
269         description = "Time taken to delete a datanode")
270     public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
271                                final OffsetDateTime observedTimestamp) {
272         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
273         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
274         cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
275         processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
276     }
277
278     @Override
279     @Timed(value = "cps.data.service.datanode.batch.delete",
280         description = "Time taken to delete a batch of datanodes")
281     public void deleteDataNodes(final String dataspaceName, final String anchorName,
282                                 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
283         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
284         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
285         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
286         dataNodeXpaths.forEach(dataNodeXpath ->
287             processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
288     }
289
290     @Override
291     @Timed(value = "cps.data.service.datanode.delete.anchor",
292         description = "Time taken to delete all datanodes for an anchor")
293     public void deleteDataNodes(final String dataspaceName, final String anchorName,
294                                 final OffsetDateTime observedTimestamp) {
295         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
296         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
297         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
298         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
299     }
300
301     @Override
302     @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
303         description = "Time taken to delete all datanodes for multiple anchors")
304     public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
305                                 final OffsetDateTime observedTimestamp) {
306         cpsValidator.validateNameCharacters(dataspaceName);
307         cpsValidator.validateNameCharacters(anchorNames);
308         for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
309             processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
310         }
311         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
312     }
313
314     @Override
315     @Timed(value = "cps.data.service.list.delete",
316         description = "Time taken to delete a list or list element")
317     public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
318         final OffsetDateTime observedTimestamp) {
319         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
320         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
321         cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
322         processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
323     }
324
325     private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
326         final Collection<DataNode> dataNodes = new ArrayList<>();
327         for (final Map.Entry<String, String> nodeJsonData : nodesJsonData.entrySet()) {
328             dataNodes.addAll(buildDataNodes(anchor, nodeJsonData.getKey(), nodeJsonData.getValue(), ContentType.JSON));
329         }
330         return dataNodes;
331     }
332
333     private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
334                                                 final String nodeData, final ContentType contentType) {
335         final SchemaContext schemaContext = getSchemaContext(anchor);
336
337         if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
338             final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
339             final Collection<DataNode> dataNodes = new DataNodeBuilder()
340                     .withContainerNode(containerNode)
341                     .buildCollection();
342             if (dataNodes.isEmpty()) {
343                 throw new DataValidationException("No data nodes.", "No data nodes provided");
344             }
345             return dataNodes;
346         }
347         final String normalizedParentNodeXpath = CpsPathUtil.getNormalizedXpath(parentNodeXpath);
348         final ContainerNode containerNode =
349             timedYangParser.parseData(contentType, nodeData, schemaContext, normalizedParentNodeXpath);
350         final Collection<DataNode> dataNodes = new DataNodeBuilder()
351             .withParentNodeXpath(normalizedParentNodeXpath)
352             .withContainerNode(containerNode)
353             .buildCollection();
354         if (dataNodes.isEmpty()) {
355             throw new DataValidationException("No data nodes.", "No data nodes provided");
356         }
357         return dataNodes;
358     }
359
360     private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
361                                                             final Collection<String> nodeDataList,
362                                                             final ContentType contentType) {
363         return nodeDataList.stream()
364             .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
365             .collect(Collectors.toList());
366     }
367
368     private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
369                                               final Operation operation, final OffsetDateTime observedTimestamp) {
370         try {
371             notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
372         } catch (final Exception exception) {
373             //If async message can't be queued for notification service, the initial request should not fail.
374             log.error("Failed to send message to notification service", exception);
375         }
376     }
377
378     private SchemaContext getSchemaContext(final Anchor anchor) {
379         return yangTextSchemaSourceSetCache
380             .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
381     }
382
383     private static boolean isRootNodeXpath(final String xpath) {
384         return ROOT_NODE_XPATH.equals(xpath);
385     }
386
387     private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
388         cpsDataPersistenceService.batchUpdateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
389                 Collections.singletonMap(dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves()));
390         final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
391         for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
392             processDataNodeUpdate(anchor, childDataNodeUpdate);
393         }
394     }
395
396 }