2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.io.Serializable;
33 import java.time.OffsetDateTime;
34 import java.util.Collection;
35 import java.util.Collections;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.cpspath.parser.CpsPathUtil;
43 import org.onap.cps.notification.NotificationService;
44 import org.onap.cps.notification.Operation;
45 import org.onap.cps.spi.CpsDataPersistenceService;
46 import org.onap.cps.spi.FetchDescendantsOption;
47 import org.onap.cps.spi.exceptions.DataValidationException;
48 import org.onap.cps.spi.model.Anchor;
49 import org.onap.cps.spi.model.DataNode;
50 import org.onap.cps.spi.model.DataNodeBuilder;
51 import org.onap.cps.spi.utils.CpsValidator;
52 import org.onap.cps.utils.ContentType;
53 import org.onap.cps.utils.TimedYangParser;
54 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import org.springframework.stereotype.Service;
60 @RequiredArgsConstructor
61 public class CpsDataServiceImpl implements CpsDataService {
63 private static final String ROOT_NODE_XPATH = "/";
64 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
66 private final CpsDataPersistenceService cpsDataPersistenceService;
67 private final CpsAdminService cpsAdminService;
68 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
69 private final NotificationService notificationService;
70 private final CpsValidator cpsValidator;
71 private final TimedYangParser timedYangParser;
74 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
75 final OffsetDateTime observedTimestamp) {
76 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
80 @Timed(value = "cps.data.service.datanode.root.save",
81 description = "Time taken to save a root data node")
82 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
83 final OffsetDateTime observedTimestamp, final ContentType contentType) {
84 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
85 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
86 final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
87 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
88 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
92 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
93 final String nodeData, final OffsetDateTime observedTimestamp) {
94 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
98 @Timed(value = "cps.data.service.datanode.child.save",
99 description = "Time taken to save a child data node")
100 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
101 final String nodeData, final OffsetDateTime observedTimestamp,
102 final ContentType contentType) {
103 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
104 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
105 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
106 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
107 processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
111 @Timed(value = "cps.data.service.list.element.save",
112 description = "Time taken to save a list element")
113 public void saveListElements(final String dataspaceName, final String anchorName,
114 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
115 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
116 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
117 final Collection<DataNode> listElementDataNodeCollection =
118 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
119 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
120 listElementDataNodeCollection);
121 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
125 @Timed(value = "cps.data.service.list.element.batch.save",
126 description = "Time taken to save a batch of list elements")
127 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
128 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
129 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
130 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
131 final Collection<Collection<DataNode>> listElementDataNodeCollections =
132 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
133 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
134 listElementDataNodeCollections);
135 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
139 @Timed(value = "cps.data.service.datanode.get",
140 description = "Time taken to get data nodes for an xpath")
141 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
143 final FetchDescendantsOption fetchDescendantsOption) {
144 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
145 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
149 @Timed(value = "cps.data.service.datanode.batch.get",
150 description = "Time taken to get a batch of data nodes")
151 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
152 final Collection<String> xpaths,
153 final FetchDescendantsOption fetchDescendantsOption) {
154 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
155 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
156 fetchDescendantsOption);
160 @Timed(value = "cps.data.service.datanode.leaves.update",
161 description = "Time taken to update a batch of leaf data nodes")
162 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
163 final String jsonData, final OffsetDateTime observedTimestamp) {
164 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
165 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
166 final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
168 final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
169 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
170 cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
171 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
175 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
176 description = "Time taken to update data node leaves and existing descendants leaves")
177 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
178 final String parentNodeXpath,
179 final String dataNodeUpdatesAsJson,
180 final OffsetDateTime observedTimestamp) {
181 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
182 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
183 final Collection<DataNode> dataNodeUpdates =
184 buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
185 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
186 processDataNodeUpdate(anchor, dataNodeUpdate);
188 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
192 public String startSession() {
193 return cpsDataPersistenceService.startSession();
197 public void closeSession(final String sessionId) {
198 cpsDataPersistenceService.closeSession(sessionId);
202 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
203 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
207 public void lockAnchor(final String sessionID, final String dataspaceName,
208 final String anchorName, final Long timeoutInMilliseconds) {
209 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
213 @Timed(value = "cps.data.service.datanode.descendants.update",
214 description = "Time taken to update a data node and descendants")
215 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
216 final String parentNodeXpath, final String jsonData,
217 final OffsetDateTime observedTimestamp) {
218 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
219 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
220 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
221 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
222 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
226 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
227 description = "Time taken to update a batch of data nodes and descendants")
228 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
229 final Map<String, String> nodesJsonData,
230 final OffsetDateTime observedTimestamp) {
231 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
232 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
233 final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
234 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
235 nodesJsonData.keySet().forEach(nodeXpath ->
236 processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
240 @Timed(value = "cps.data.service.list.update",
241 description = "Time taken to update a list")
242 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
243 final String jsonData, final OffsetDateTime observedTimestamp) {
244 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
245 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
246 final Collection<DataNode> newListElements =
247 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
248 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
252 @Timed(value = "cps.data.service.list.batch.update",
253 description = "Time taken to update a batch of lists")
254 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
255 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
256 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
257 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
258 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
259 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
263 @Timed(value = "cps.data.service.datanode.delete",
264 description = "Time taken to delete a datanode")
265 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
266 final OffsetDateTime observedTimestamp) {
267 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
268 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
269 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
270 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
274 @Timed(value = "cps.data.service.datanode.batch.delete",
275 description = "Time taken to delete a batch of datanodes")
276 public void deleteDataNodes(final String dataspaceName, final String anchorName,
277 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
278 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
279 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
280 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
281 dataNodeXpaths.forEach(dataNodeXpath ->
282 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
286 @Timed(value = "cps.data.service.datanode.delete.anchor",
287 description = "Time taken to delete all datanodes for an anchor")
288 public void deleteDataNodes(final String dataspaceName, final String anchorName,
289 final OffsetDateTime observedTimestamp) {
290 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
291 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
292 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
293 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
297 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
298 description = "Time taken to delete all datanodes for multiple anchors")
299 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
300 final OffsetDateTime observedTimestamp) {
301 cpsValidator.validateNameCharacters(dataspaceName);
302 cpsValidator.validateNameCharacters(anchorNames);
303 for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
304 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
306 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
310 @Timed(value = "cps.data.service.list.delete",
311 description = "Time taken to delete a list or list element")
312 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
313 final OffsetDateTime observedTimestamp) {
314 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
315 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
316 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
317 processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
320 private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
321 final ContentType contentType) {
322 final SchemaContext schemaContext = getSchemaContext(anchor);
324 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
325 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
326 return new DataNodeBuilder().withContainerNode(containerNode).build();
329 final ContainerNode containerNode =
330 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
332 return new DataNodeBuilder()
333 .withParentNodeXpath(parentNodeXpath)
334 .withContainerNode(containerNode)
338 private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
339 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
340 buildDataNode(anchor, nodeJsonData.getKey(),
341 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
344 private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
345 final String nodeData, final ContentType contentType) {
346 final SchemaContext schemaContext = getSchemaContext(anchor);
348 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
349 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
350 final Collection<DataNode> dataNodes = new DataNodeBuilder()
351 .withContainerNode(containerNode)
353 if (dataNodes.isEmpty()) {
354 throw new DataValidationException("Invalid data.", "No data nodes provided");
358 final String normalizedParentNodeXpath = CpsPathUtil.getNormalizedXpath(parentNodeXpath);
359 final ContainerNode containerNode =
360 timedYangParser.parseData(contentType, nodeData, schemaContext, normalizedParentNodeXpath);
361 final Collection<DataNode> dataNodes = new DataNodeBuilder()
362 .withParentNodeXpath(normalizedParentNodeXpath)
363 .withContainerNode(containerNode)
365 if (dataNodes.isEmpty()) {
366 throw new DataValidationException("Invalid data.", "No data nodes provided");
371 private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
372 final Collection<String> nodeDataList,
373 final ContentType contentType) {
374 return nodeDataList.stream()
375 .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
376 .collect(Collectors.toList());
379 private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
380 final Operation operation, final OffsetDateTime observedTimestamp) {
382 notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
383 } catch (final Exception exception) {
384 //If async message can't be queued for notification service, the initial request should not failed.
385 log.error("Failed to send message to notification service", exception);
389 private SchemaContext getSchemaContext(final Anchor anchor) {
390 return yangTextSchemaSourceSetCache
391 .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
394 private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
395 if (dataNodeUpdate == null) {
398 cpsDataPersistenceService.batchUpdateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
399 Collections.singletonMap(dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves()));
400 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
401 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
402 processDataNodeUpdate(anchor, childDataNodeUpdate);