2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.io.Serializable;
33 import java.time.OffsetDateTime;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
38 import java.util.stream.Collectors;
39 import lombok.RequiredArgsConstructor;
40 import lombok.extern.slf4j.Slf4j;
41 import org.onap.cps.api.CpsAdminService;
42 import org.onap.cps.api.CpsDataService;
43 import org.onap.cps.cpspath.parser.CpsPathUtil;
44 import org.onap.cps.notification.NotificationService;
45 import org.onap.cps.notification.Operation;
46 import org.onap.cps.spi.CpsDataPersistenceService;
47 import org.onap.cps.spi.FetchDescendantsOption;
48 import org.onap.cps.spi.exceptions.DataValidationException;
49 import org.onap.cps.spi.model.Anchor;
50 import org.onap.cps.spi.model.DataNode;
51 import org.onap.cps.spi.model.DataNodeBuilder;
52 import org.onap.cps.spi.utils.CpsValidator;
53 import org.onap.cps.utils.ContentType;
54 import org.onap.cps.utils.TimedYangParser;
55 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import org.springframework.stereotype.Service;
61 @RequiredArgsConstructor
62 public class CpsDataServiceImpl implements CpsDataService {
64 private static final String ROOT_NODE_XPATH = "/";
65 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
67 private final CpsDataPersistenceService cpsDataPersistenceService;
68 private final CpsAdminService cpsAdminService;
69 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
70 private final NotificationService notificationService;
71 private final CpsValidator cpsValidator;
72 private final TimedYangParser timedYangParser;
75 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
76 final OffsetDateTime observedTimestamp) {
77 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
81 @Timed(value = "cps.data.service.datanode.root.save",
82 description = "Time taken to save a root data node")
83 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
84 final OffsetDateTime observedTimestamp, final ContentType contentType) {
85 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
86 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
87 final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
88 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
89 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
93 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
94 final String nodeData, final OffsetDateTime observedTimestamp) {
95 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
99 @Timed(value = "cps.data.service.datanode.child.save",
100 description = "Time taken to save a child data node")
101 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
102 final String nodeData, final OffsetDateTime observedTimestamp,
103 final ContentType contentType) {
104 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
105 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
106 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
107 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
108 processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
112 @Timed(value = "cps.data.service.list.element.save",
113 description = "Time taken to save a list element")
114 public void saveListElements(final String dataspaceName, final String anchorName,
115 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
116 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
117 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
118 final Collection<DataNode> listElementDataNodeCollection =
119 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
120 if (isRootNodeXpath(parentNodeXpath)) {
121 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, listElementDataNodeCollection);
123 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
124 listElementDataNodeCollection);
126 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
130 @Timed(value = "cps.data.service.list.element.batch.save",
131 description = "Time taken to save a batch of list elements")
132 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
133 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
134 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
135 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
136 final Collection<Collection<DataNode>> listElementDataNodeCollections =
137 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
138 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
139 listElementDataNodeCollections);
140 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
144 @Timed(value = "cps.data.service.datanode.get",
145 description = "Time taken to get data nodes for an xpath")
146 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
148 final FetchDescendantsOption fetchDescendantsOption) {
149 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
150 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
154 @Timed(value = "cps.data.service.datanode.batch.get",
155 description = "Time taken to get a batch of data nodes")
156 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
157 final Collection<String> xpaths,
158 final FetchDescendantsOption fetchDescendantsOption) {
159 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
160 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
161 fetchDescendantsOption);
165 @Timed(value = "cps.data.service.datanode.leaves.update",
166 description = "Time taken to update a batch of leaf data nodes")
167 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
168 final String jsonData, final OffsetDateTime observedTimestamp) {
169 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
170 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
171 final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
173 final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
174 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
175 cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
176 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
180 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
181 description = "Time taken to update data node leaves and existing descendants leaves")
182 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
183 final String parentNodeXpath,
184 final String dataNodeUpdatesAsJson,
185 final OffsetDateTime observedTimestamp) {
186 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
187 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
188 final Collection<DataNode> dataNodeUpdates =
189 buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
190 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
191 processDataNodeUpdate(anchor, dataNodeUpdate);
193 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
197 public String startSession() {
198 return cpsDataPersistenceService.startSession();
202 public void closeSession(final String sessionId) {
203 cpsDataPersistenceService.closeSession(sessionId);
207 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
208 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
212 public void lockAnchor(final String sessionID, final String dataspaceName,
213 final String anchorName, final Long timeoutInMilliseconds) {
214 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
218 @Timed(value = "cps.data.service.datanode.descendants.update",
219 description = "Time taken to update a data node and descendants")
220 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
221 final String parentNodeXpath, final String jsonData,
222 final OffsetDateTime observedTimestamp) {
223 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
224 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
225 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
226 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
227 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
231 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
232 description = "Time taken to update a batch of data nodes and descendants")
233 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
234 final Map<String, String> nodesJsonData,
235 final OffsetDateTime observedTimestamp) {
236 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
237 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
238 final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
239 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
240 nodesJsonData.keySet().forEach(nodeXpath ->
241 processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
245 @Timed(value = "cps.data.service.list.update",
246 description = "Time taken to update a list")
247 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
248 final String jsonData, final OffsetDateTime observedTimestamp) {
249 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
250 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
251 final Collection<DataNode> newListElements =
252 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
253 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
257 @Timed(value = "cps.data.service.list.batch.update",
258 description = "Time taken to update a batch of lists")
259 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
260 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
261 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
262 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
263 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
264 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
268 @Timed(value = "cps.data.service.datanode.delete",
269 description = "Time taken to delete a datanode")
270 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
271 final OffsetDateTime observedTimestamp) {
272 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
273 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
274 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
275 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
279 @Timed(value = "cps.data.service.datanode.batch.delete",
280 description = "Time taken to delete a batch of datanodes")
281 public void deleteDataNodes(final String dataspaceName, final String anchorName,
282 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
283 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
284 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
285 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
286 dataNodeXpaths.forEach(dataNodeXpath ->
287 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
291 @Timed(value = "cps.data.service.datanode.delete.anchor",
292 description = "Time taken to delete all datanodes for an anchor")
293 public void deleteDataNodes(final String dataspaceName, final String anchorName,
294 final OffsetDateTime observedTimestamp) {
295 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
296 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
297 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
298 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
302 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
303 description = "Time taken to delete all datanodes for multiple anchors")
304 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
305 final OffsetDateTime observedTimestamp) {
306 cpsValidator.validateNameCharacters(dataspaceName);
307 cpsValidator.validateNameCharacters(anchorNames);
308 for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
309 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
311 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
315 @Timed(value = "cps.data.service.list.delete",
316 description = "Time taken to delete a list or list element")
317 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
318 final OffsetDateTime observedTimestamp) {
319 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
320 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
321 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
322 processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
325 private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
326 final Collection<DataNode> dataNodes = new ArrayList<>();
327 for (final Map.Entry<String, String> nodeJsonData : nodesJsonData.entrySet()) {
328 dataNodes.addAll(buildDataNodes(anchor, nodeJsonData.getKey(), nodeJsonData.getValue(), ContentType.JSON));
333 private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
334 final String nodeData, final ContentType contentType) {
335 final SchemaContext schemaContext = getSchemaContext(anchor);
337 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
338 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
339 final Collection<DataNode> dataNodes = new DataNodeBuilder()
340 .withContainerNode(containerNode)
342 if (dataNodes.isEmpty()) {
343 throw new DataValidationException("No data nodes.", "No data nodes provided");
347 final String normalizedParentNodeXpath = CpsPathUtil.getNormalizedXpath(parentNodeXpath);
348 final ContainerNode containerNode =
349 timedYangParser.parseData(contentType, nodeData, schemaContext, normalizedParentNodeXpath);
350 final Collection<DataNode> dataNodes = new DataNodeBuilder()
351 .withParentNodeXpath(normalizedParentNodeXpath)
352 .withContainerNode(containerNode)
354 if (dataNodes.isEmpty()) {
355 throw new DataValidationException("No data nodes.", "No data nodes provided");
360 private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
361 final Collection<String> nodeDataList,
362 final ContentType contentType) {
363 return nodeDataList.stream()
364 .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
365 .collect(Collectors.toList());
368 private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
369 final Operation operation, final OffsetDateTime observedTimestamp) {
371 notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
372 } catch (final Exception exception) {
373 //If async message can't be queued for notification service, the initial request should not fail.
374 log.error("Failed to send message to notification service", exception);
378 private SchemaContext getSchemaContext(final Anchor anchor) {
379 return yangTextSchemaSourceSetCache
380 .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
383 private static boolean isRootNodeXpath(final String xpath) {
384 return ROOT_NODE_XPATH.equals(xpath);
387 private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
388 cpsDataPersistenceService.batchUpdateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
389 Collections.singletonMap(dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves()));
390 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
391 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
392 processDataNodeUpdate(anchor, childDataNodeUpdate);