2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.io.Serializable;
33 import java.time.OffsetDateTime;
34 import java.util.Collection;
35 import java.util.Collections;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.cpspath.parser.CpsPathUtil;
43 import org.onap.cps.notification.NotificationService;
44 import org.onap.cps.notification.Operation;
45 import org.onap.cps.spi.CpsDataPersistenceService;
46 import org.onap.cps.spi.FetchDescendantsOption;
47 import org.onap.cps.spi.exceptions.DataValidationException;
48 import org.onap.cps.spi.model.Anchor;
49 import org.onap.cps.spi.model.DataNode;
50 import org.onap.cps.spi.model.DataNodeBuilder;
51 import org.onap.cps.spi.utils.CpsValidator;
52 import org.onap.cps.utils.ContentType;
53 import org.onap.cps.utils.TimedYangParser;
54 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import org.springframework.stereotype.Service;
60 @RequiredArgsConstructor
61 public class CpsDataServiceImpl implements CpsDataService {
63 private static final String ROOT_NODE_XPATH = "/";
64 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
66 private final CpsDataPersistenceService cpsDataPersistenceService;
67 private final CpsAdminService cpsAdminService;
68 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
69 private final NotificationService notificationService;
70 private final CpsValidator cpsValidator;
71 private final TimedYangParser timedYangParser;
74 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
75 final OffsetDateTime observedTimestamp) {
76 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
80 @Timed(value = "cps.data.service.datanode.root.save",
81 description = "Time taken to save a root data node")
82 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
83 final OffsetDateTime observedTimestamp, final ContentType contentType) {
84 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
85 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
86 final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
87 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
88 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
92 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
93 final String nodeData, final OffsetDateTime observedTimestamp) {
94 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
98 @Timed(value = "cps.data.service.datanode.child.save",
99 description = "Time taken to save a child data node")
100 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
101 final String nodeData, final OffsetDateTime observedTimestamp,
102 final ContentType contentType) {
103 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
104 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
105 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
106 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
107 processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
111 @Timed(value = "cps.data.service.list.element.save",
112 description = "Time taken to save a list element")
113 public void saveListElements(final String dataspaceName, final String anchorName,
114 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
115 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
116 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
117 final Collection<DataNode> listElementDataNodeCollection =
118 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
119 if (isRootNodeXpath(parentNodeXpath)) {
120 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, listElementDataNodeCollection);
122 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
123 listElementDataNodeCollection);
125 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
129 @Timed(value = "cps.data.service.list.element.batch.save",
130 description = "Time taken to save a batch of list elements")
131 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
132 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
133 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
134 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
135 final Collection<Collection<DataNode>> listElementDataNodeCollections =
136 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
137 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
138 listElementDataNodeCollections);
139 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
143 @Timed(value = "cps.data.service.datanode.get",
144 description = "Time taken to get data nodes for an xpath")
145 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
147 final FetchDescendantsOption fetchDescendantsOption) {
148 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
149 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
153 @Timed(value = "cps.data.service.datanode.batch.get",
154 description = "Time taken to get a batch of data nodes")
155 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
156 final Collection<String> xpaths,
157 final FetchDescendantsOption fetchDescendantsOption) {
158 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
159 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
160 fetchDescendantsOption);
164 @Timed(value = "cps.data.service.datanode.leaves.update",
165 description = "Time taken to update a batch of leaf data nodes")
166 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
167 final String jsonData, final OffsetDateTime observedTimestamp) {
168 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
169 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
170 final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
172 final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
173 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
174 cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
175 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
179 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
180 description = "Time taken to update data node leaves and existing descendants leaves")
181 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
182 final String parentNodeXpath,
183 final String dataNodeUpdatesAsJson,
184 final OffsetDateTime observedTimestamp) {
185 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
186 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
187 final Collection<DataNode> dataNodeUpdates =
188 buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
189 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
190 processDataNodeUpdate(anchor, dataNodeUpdate);
192 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
196 public String startSession() {
197 return cpsDataPersistenceService.startSession();
201 public void closeSession(final String sessionId) {
202 cpsDataPersistenceService.closeSession(sessionId);
206 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
207 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
211 public void lockAnchor(final String sessionID, final String dataspaceName,
212 final String anchorName, final Long timeoutInMilliseconds) {
213 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
217 @Timed(value = "cps.data.service.datanode.descendants.update",
218 description = "Time taken to update a data node and descendants")
219 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
220 final String parentNodeXpath, final String jsonData,
221 final OffsetDateTime observedTimestamp) {
222 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
223 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
224 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
225 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
226 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
230 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
231 description = "Time taken to update a batch of data nodes and descendants")
232 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
233 final Map<String, String> nodesJsonData,
234 final OffsetDateTime observedTimestamp) {
235 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
236 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
237 final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
238 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
239 nodesJsonData.keySet().forEach(nodeXpath ->
240 processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
244 @Timed(value = "cps.data.service.list.update",
245 description = "Time taken to update a list")
246 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
247 final String jsonData, final OffsetDateTime observedTimestamp) {
248 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
249 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
250 final Collection<DataNode> newListElements =
251 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
252 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
256 @Timed(value = "cps.data.service.list.batch.update",
257 description = "Time taken to update a batch of lists")
258 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
259 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
260 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
261 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
262 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
263 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
267 @Timed(value = "cps.data.service.datanode.delete",
268 description = "Time taken to delete a datanode")
269 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
270 final OffsetDateTime observedTimestamp) {
271 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
272 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
273 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
274 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
278 @Timed(value = "cps.data.service.datanode.batch.delete",
279 description = "Time taken to delete a batch of datanodes")
280 public void deleteDataNodes(final String dataspaceName, final String anchorName,
281 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
282 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
283 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
284 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
285 dataNodeXpaths.forEach(dataNodeXpath ->
286 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
290 @Timed(value = "cps.data.service.datanode.delete.anchor",
291 description = "Time taken to delete all datanodes for an anchor")
292 public void deleteDataNodes(final String dataspaceName, final String anchorName,
293 final OffsetDateTime observedTimestamp) {
294 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
295 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
296 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
297 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
301 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
302 description = "Time taken to delete all datanodes for multiple anchors")
303 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
304 final OffsetDateTime observedTimestamp) {
305 cpsValidator.validateNameCharacters(dataspaceName);
306 cpsValidator.validateNameCharacters(anchorNames);
307 for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
308 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
310 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
314 @Timed(value = "cps.data.service.list.delete",
315 description = "Time taken to delete a list or list element")
316 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
317 final OffsetDateTime observedTimestamp) {
318 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
319 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
320 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
321 processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
324 private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
325 final ContentType contentType) {
326 final SchemaContext schemaContext = getSchemaContext(anchor);
328 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
329 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
330 return new DataNodeBuilder().withContainerNode(containerNode).build();
333 final ContainerNode containerNode =
334 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
336 return new DataNodeBuilder()
337 .withParentNodeXpath(parentNodeXpath)
338 .withContainerNode(containerNode)
342 private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
343 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
344 buildDataNode(anchor, nodeJsonData.getKey(),
345 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
348 private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
349 final String nodeData, final ContentType contentType) {
350 final SchemaContext schemaContext = getSchemaContext(anchor);
352 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
353 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
354 final Collection<DataNode> dataNodes = new DataNodeBuilder()
355 .withContainerNode(containerNode)
357 if (dataNodes.isEmpty()) {
358 throw new DataValidationException("No data nodes.", "No data nodes provided");
362 final String normalizedParentNodeXpath = CpsPathUtil.getNormalizedXpath(parentNodeXpath);
363 final ContainerNode containerNode =
364 timedYangParser.parseData(contentType, nodeData, schemaContext, normalizedParentNodeXpath);
365 final Collection<DataNode> dataNodes = new DataNodeBuilder()
366 .withParentNodeXpath(normalizedParentNodeXpath)
367 .withContainerNode(containerNode)
369 if (dataNodes.isEmpty()) {
370 throw new DataValidationException("No data nodes.", "No data nodes provided");
375 private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
376 final Collection<String> nodeDataList,
377 final ContentType contentType) {
378 return nodeDataList.stream()
379 .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
380 .collect(Collectors.toList());
383 private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
384 final Operation operation, final OffsetDateTime observedTimestamp) {
386 notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
387 } catch (final Exception exception) {
388 //If async message can't be queued for notification service, the initial request should not fail.
389 log.error("Failed to send message to notification service", exception);
393 private SchemaContext getSchemaContext(final Anchor anchor) {
394 return yangTextSchemaSourceSetCache
395 .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
398 private static boolean isRootNodeXpath(final String xpath) {
399 return ROOT_NODE_XPATH.equals(xpath);
402 private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
403 cpsDataPersistenceService.batchUpdateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
404 Collections.singletonMap(dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves()));
405 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
406 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
407 processDataNodeUpdate(anchor, childDataNodeUpdate);