2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.Collection;
35 import java.util.stream.Collectors;
36 import lombok.RequiredArgsConstructor;
37 import lombok.extern.slf4j.Slf4j;
38 import org.onap.cps.api.CpsAdminService;
39 import org.onap.cps.api.CpsDataService;
40 import org.onap.cps.notification.NotificationService;
41 import org.onap.cps.notification.Operation;
42 import org.onap.cps.spi.CpsDataPersistenceService;
43 import org.onap.cps.spi.FetchDescendantsOption;
44 import org.onap.cps.spi.exceptions.DataValidationException;
45 import org.onap.cps.spi.model.Anchor;
46 import org.onap.cps.spi.model.DataNode;
47 import org.onap.cps.spi.model.DataNodeBuilder;
48 import org.onap.cps.spi.utils.CpsValidator;
49 import org.onap.cps.utils.ContentType;
50 import org.onap.cps.utils.TimedYangParser;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.springframework.stereotype.Service;
57 @RequiredArgsConstructor
58 public class CpsDataServiceImpl implements CpsDataService {
60 private static final String ROOT_NODE_XPATH = "/";
61 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
63 private final CpsDataPersistenceService cpsDataPersistenceService;
64 private final CpsAdminService cpsAdminService;
65 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
66 private final NotificationService notificationService;
67 private final CpsValidator cpsValidator;
68 private final TimedYangParser timedYangParser;
71 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
72 final OffsetDateTime observedTimestamp) {
73 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
77 @Timed(value = "cps.data.service.datanode.root.save",
78 description = "Time taken to save a root data node")
79 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
80 final OffsetDateTime observedTimestamp, final ContentType contentType) {
81 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
82 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
83 final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
84 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
85 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
89 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
90 final String nodeData, final OffsetDateTime observedTimestamp) {
91 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
95 @Timed(value = "cps.data.service.datanode.child.save",
96 description = "Time taken to save a child data node")
97 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
98 final String nodeData, final OffsetDateTime observedTimestamp,
99 final ContentType contentType) {
100 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
101 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
102 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
103 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
104 processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
108 @Timed(value = "cps.data.service.list.element.save",
109 description = "Time taken to save a list element")
110 public void saveListElements(final String dataspaceName, final String anchorName,
111 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
112 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
113 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
114 final Collection<DataNode> listElementDataNodeCollection =
115 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
116 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
117 listElementDataNodeCollection);
118 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
122 @Timed(value = "cps.data.service.list.element.batch.save",
123 description = "Time taken to save a batch of list elements")
124 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
125 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
126 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
127 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
128 final Collection<Collection<DataNode>> listElementDataNodeCollections =
129 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
130 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131 listElementDataNodeCollections);
132 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
136 @Timed(value = "cps.data.service.datanode.get",
137 description = "Time taken to get data nodes for an xpath")
138 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
140 final FetchDescendantsOption fetchDescendantsOption) {
141 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
142 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
146 @Timed(value = "cps.data.service.datanode.batch.get",
147 description = "Time taken to get a batch of data nodes")
148 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
149 final Collection<String> xpaths,
150 final FetchDescendantsOption fetchDescendantsOption) {
151 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
152 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
153 fetchDescendantsOption);
157 @Timed(value = "cps.data.service.datanode.leaves.update",
158 description = "Time taken to get a batch of data nodes")
159 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
160 final String jsonData, final OffsetDateTime observedTimestamp) {
161 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
162 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
163 final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
165 if (dataNodesInPatch.size() > 1) {
166 throw new DataValidationException("Operation is not supported for multiple data nodes",
167 "Number of data nodes present: " + dataNodesInPatch.size());
169 cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName,
170 dataNodesInPatch.iterator().next().getXpath(),
171 dataNodesInPatch.iterator().next().getLeaves());
172 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
176 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
177 description = "Time taken to update data node leaves and existing descendants leaves")
178 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
179 final String parentNodeXpath,
180 final String dataNodeUpdatesAsJson,
181 final OffsetDateTime observedTimestamp) {
182 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
183 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
184 final Collection<DataNode> dataNodeUpdates =
185 buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
186 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
187 processDataNodeUpdate(anchor, dataNodeUpdate);
189 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
193 public String startSession() {
194 return cpsDataPersistenceService.startSession();
198 public void closeSession(final String sessionId) {
199 cpsDataPersistenceService.closeSession(sessionId);
203 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
204 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
208 public void lockAnchor(final String sessionID, final String dataspaceName,
209 final String anchorName, final Long timeoutInMilliseconds) {
210 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
214 @Timed(value = "cps.data.service.datanode.descendants.update",
215 description = "Time taken to update a data node and descendants")
216 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
217 final String parentNodeXpath, final String jsonData,
218 final OffsetDateTime observedTimestamp) {
219 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
220 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
221 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
222 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
223 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
227 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
228 description = "Time taken to update a batch of data nodes and descendants")
229 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
230 final Map<String, String> nodesJsonData,
231 final OffsetDateTime observedTimestamp) {
232 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
233 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
234 final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
235 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
236 nodesJsonData.keySet().forEach(nodeXpath ->
237 processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
241 @Timed(value = "cps.data.service.list.update",
242 description = "Time taken to update a list")
243 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
244 final String jsonData, final OffsetDateTime observedTimestamp) {
245 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
246 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
247 final Collection<DataNode> newListElements =
248 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
249 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
253 @Timed(value = "cps.data.service.list.batch.update",
254 description = "Time taken to update a batch of lists")
255 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
256 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
257 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
258 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
259 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
260 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
264 @Timed(value = "cps.data.service.datanode.delete",
265 description = "Time taken to delete a datanode")
266 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
267 final OffsetDateTime observedTimestamp) {
268 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
269 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
270 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
271 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
275 @Timed(value = "cps.data.service.datanode.batch.delete",
276 description = "Time taken to delete a batch of datanodes")
277 public void deleteDataNodes(final String dataspaceName, final String anchorName,
278 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
279 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
280 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
281 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
282 dataNodeXpaths.forEach(dataNodeXpath ->
283 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
287 @Timed(value = "cps.data.service.datanode.delete.anchor",
288 description = "Time taken to delete all datanodes for an anchor")
289 public void deleteDataNodes(final String dataspaceName, final String anchorName,
290 final OffsetDateTime observedTimestamp) {
291 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
292 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
293 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
294 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
298 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
299 description = "Time taken to delete all datanodes for multiple anchors")
300 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
301 final OffsetDateTime observedTimestamp) {
302 cpsValidator.validateNameCharacters(dataspaceName);
303 cpsValidator.validateNameCharacters(anchorNames);
304 for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
305 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
307 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
311 @Timed(value = "cps.data.service.list.delete",
312 description = "Time taken to delete a list or list element")
313 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
314 final OffsetDateTime observedTimestamp) {
315 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
316 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
317 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
318 processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
321 private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
322 final ContentType contentType) {
323 final SchemaContext schemaContext = getSchemaContext(anchor);
325 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
326 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
327 return new DataNodeBuilder().withContainerNode(containerNode).build();
330 final ContainerNode containerNode =
331 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
333 return new DataNodeBuilder()
334 .withParentNodeXpath(parentNodeXpath)
335 .withContainerNode(containerNode)
339 private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
340 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
341 buildDataNode(anchor, nodeJsonData.getKey(),
342 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
345 private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
346 final String nodeData, final ContentType contentType) {
347 final SchemaContext schemaContext = getSchemaContext(anchor);
349 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
350 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
351 final Collection<DataNode> dataNodes = new DataNodeBuilder()
352 .withContainerNode(containerNode)
354 if (dataNodes.isEmpty()) {
355 throw new DataValidationException("Invalid data.", "No data nodes provided");
359 final ContainerNode containerNode =
360 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
361 final Collection<DataNode> dataNodes = new DataNodeBuilder()
362 .withParentNodeXpath(parentNodeXpath)
363 .withContainerNode(containerNode)
365 if (dataNodes.isEmpty()) {
366 throw new DataValidationException("Invalid data.", "No data nodes provided");
371 private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
372 final Collection<String> nodeDataList,
373 final ContentType contentType) {
374 return nodeDataList.stream()
375 .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
376 .collect(Collectors.toList());
379 private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
380 final Operation operation, final OffsetDateTime observedTimestamp) {
382 notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
383 } catch (final Exception exception) {
384 //If async message can't be queued for notification service, the initial request should not failed.
385 log.error("Failed to send message to notification service", exception);
389 private SchemaContext getSchemaContext(final Anchor anchor) {
390 return yangTextSchemaSourceSetCache
391 .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
394 private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
395 if (dataNodeUpdate == null) {
398 cpsDataPersistenceService.updateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
399 dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves());
400 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
401 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
402 processDataNodeUpdate(anchor, childDataNodeUpdate);