2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.io.Serializable;
33 import java.time.OffsetDateTime;
34 import java.util.Collection;
35 import java.util.Collections;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.notification.NotificationService;
43 import org.onap.cps.notification.Operation;
44 import org.onap.cps.spi.CpsDataPersistenceService;
45 import org.onap.cps.spi.FetchDescendantsOption;
46 import org.onap.cps.spi.exceptions.DataValidationException;
47 import org.onap.cps.spi.model.Anchor;
48 import org.onap.cps.spi.model.DataNode;
49 import org.onap.cps.spi.model.DataNodeBuilder;
50 import org.onap.cps.spi.utils.CpsValidator;
51 import org.onap.cps.utils.ContentType;
52 import org.onap.cps.utils.TimedYangParser;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.springframework.stereotype.Service;
59 @RequiredArgsConstructor
60 public class CpsDataServiceImpl implements CpsDataService {
62 private static final String ROOT_NODE_XPATH = "/";
63 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
65 private final CpsDataPersistenceService cpsDataPersistenceService;
66 private final CpsAdminService cpsAdminService;
67 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
68 private final NotificationService notificationService;
69 private final CpsValidator cpsValidator;
70 private final TimedYangParser timedYangParser;
73 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
74 final OffsetDateTime observedTimestamp) {
75 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
79 @Timed(value = "cps.data.service.datanode.root.save",
80 description = "Time taken to save a root data node")
81 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
82 final OffsetDateTime observedTimestamp, final ContentType contentType) {
83 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
84 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
85 final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
86 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
87 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
91 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
92 final String nodeData, final OffsetDateTime observedTimestamp) {
93 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
97 @Timed(value = "cps.data.service.datanode.child.save",
98 description = "Time taken to save a child data node")
99 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
100 final String nodeData, final OffsetDateTime observedTimestamp,
101 final ContentType contentType) {
102 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
103 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
104 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
105 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
106 processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
110 @Timed(value = "cps.data.service.list.element.save",
111 description = "Time taken to save a list element")
112 public void saveListElements(final String dataspaceName, final String anchorName,
113 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
114 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
115 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
116 final Collection<DataNode> listElementDataNodeCollection =
117 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
118 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
119 listElementDataNodeCollection);
120 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
124 @Timed(value = "cps.data.service.list.element.batch.save",
125 description = "Time taken to save a batch of list elements")
126 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
127 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
128 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
129 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
130 final Collection<Collection<DataNode>> listElementDataNodeCollections =
131 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
132 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
133 listElementDataNodeCollections);
134 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
138 @Timed(value = "cps.data.service.datanode.get",
139 description = "Time taken to get data nodes for an xpath")
140 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
142 final FetchDescendantsOption fetchDescendantsOption) {
143 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
144 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
148 @Timed(value = "cps.data.service.datanode.batch.get",
149 description = "Time taken to get a batch of data nodes")
150 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
151 final Collection<String> xpaths,
152 final FetchDescendantsOption fetchDescendantsOption) {
153 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
154 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
155 fetchDescendantsOption);
159 @Timed(value = "cps.data.service.datanode.leaves.update",
160 description = "Time taken to update a batch of leaf data nodes")
161 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
162 final String jsonData, final OffsetDateTime observedTimestamp) {
163 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
164 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
165 final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
167 final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
168 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
169 cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
170 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
174 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
175 description = "Time taken to update data node leaves and existing descendants leaves")
176 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
177 final String parentNodeXpath,
178 final String dataNodeUpdatesAsJson,
179 final OffsetDateTime observedTimestamp) {
180 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
181 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
182 final Collection<DataNode> dataNodeUpdates =
183 buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
184 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
185 processDataNodeUpdate(anchor, dataNodeUpdate);
187 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
191 public String startSession() {
192 return cpsDataPersistenceService.startSession();
196 public void closeSession(final String sessionId) {
197 cpsDataPersistenceService.closeSession(sessionId);
201 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
202 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
206 public void lockAnchor(final String sessionID, final String dataspaceName,
207 final String anchorName, final Long timeoutInMilliseconds) {
208 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
212 @Timed(value = "cps.data.service.datanode.descendants.update",
213 description = "Time taken to update a data node and descendants")
214 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
215 final String parentNodeXpath, final String jsonData,
216 final OffsetDateTime observedTimestamp) {
217 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
218 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
219 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
220 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
221 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
225 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
226 description = "Time taken to update a batch of data nodes and descendants")
227 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
228 final Map<String, String> nodesJsonData,
229 final OffsetDateTime observedTimestamp) {
230 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
231 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
232 final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
233 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
234 nodesJsonData.keySet().forEach(nodeXpath ->
235 processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
239 @Timed(value = "cps.data.service.list.update",
240 description = "Time taken to update a list")
241 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
242 final String jsonData, final OffsetDateTime observedTimestamp) {
243 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
244 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
245 final Collection<DataNode> newListElements =
246 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
247 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
251 @Timed(value = "cps.data.service.list.batch.update",
252 description = "Time taken to update a batch of lists")
253 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
254 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
255 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
256 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
257 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
258 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
262 @Timed(value = "cps.data.service.datanode.delete",
263 description = "Time taken to delete a datanode")
264 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
265 final OffsetDateTime observedTimestamp) {
266 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
267 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
268 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
269 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
273 @Timed(value = "cps.data.service.datanode.batch.delete",
274 description = "Time taken to delete a batch of datanodes")
275 public void deleteDataNodes(final String dataspaceName, final String anchorName,
276 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
277 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
278 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
279 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
280 dataNodeXpaths.forEach(dataNodeXpath ->
281 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
285 @Timed(value = "cps.data.service.datanode.delete.anchor",
286 description = "Time taken to delete all datanodes for an anchor")
287 public void deleteDataNodes(final String dataspaceName, final String anchorName,
288 final OffsetDateTime observedTimestamp) {
289 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
290 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
291 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
292 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
296 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
297 description = "Time taken to delete all datanodes for multiple anchors")
298 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
299 final OffsetDateTime observedTimestamp) {
300 cpsValidator.validateNameCharacters(dataspaceName);
301 cpsValidator.validateNameCharacters(anchorNames);
302 for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
303 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
305 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
309 @Timed(value = "cps.data.service.list.delete",
310 description = "Time taken to delete a list or list element")
311 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
312 final OffsetDateTime observedTimestamp) {
313 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
314 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
315 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
316 processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
319 private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
320 final ContentType contentType) {
321 final SchemaContext schemaContext = getSchemaContext(anchor);
323 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
324 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
325 return new DataNodeBuilder().withContainerNode(containerNode).build();
328 final ContainerNode containerNode =
329 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
331 return new DataNodeBuilder()
332 .withParentNodeXpath(parentNodeXpath)
333 .withContainerNode(containerNode)
337 private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
338 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
339 buildDataNode(anchor, nodeJsonData.getKey(),
340 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
343 private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
344 final String nodeData, final ContentType contentType) {
345 final SchemaContext schemaContext = getSchemaContext(anchor);
347 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
348 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
349 final Collection<DataNode> dataNodes = new DataNodeBuilder()
350 .withContainerNode(containerNode)
352 if (dataNodes.isEmpty()) {
353 throw new DataValidationException("Invalid data.", "No data nodes provided");
357 final ContainerNode containerNode =
358 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
359 final Collection<DataNode> dataNodes = new DataNodeBuilder()
360 .withParentNodeXpath(parentNodeXpath)
361 .withContainerNode(containerNode)
363 if (dataNodes.isEmpty()) {
364 throw new DataValidationException("Invalid data.", "No data nodes provided");
369 private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
370 final Collection<String> nodeDataList,
371 final ContentType contentType) {
372 return nodeDataList.stream()
373 .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
374 .collect(Collectors.toList());
377 private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
378 final Operation operation, final OffsetDateTime observedTimestamp) {
380 notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
381 } catch (final Exception exception) {
382 //If async message can't be queued for notification service, the initial request should not failed.
383 log.error("Failed to send message to notification service", exception);
387 private SchemaContext getSchemaContext(final Anchor anchor) {
388 return yangTextSchemaSourceSetCache
389 .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
392 private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
393 if (dataNodeUpdate == null) {
396 cpsDataPersistenceService.batchUpdateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
397 Collections.singletonMap(dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves()));
398 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
399 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
400 processDataNodeUpdate(anchor, childDataNodeUpdate);