2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.Collection;
35 import java.util.stream.Collectors;
36 import lombok.RequiredArgsConstructor;
37 import lombok.extern.slf4j.Slf4j;
38 import org.onap.cps.api.CpsAdminService;
39 import org.onap.cps.api.CpsDataService;
40 import org.onap.cps.notification.NotificationService;
41 import org.onap.cps.notification.Operation;
42 import org.onap.cps.spi.CpsDataPersistenceService;
43 import org.onap.cps.spi.FetchDescendantsOption;
44 import org.onap.cps.spi.exceptions.DataValidationException;
45 import org.onap.cps.spi.model.Anchor;
46 import org.onap.cps.spi.model.DataNode;
47 import org.onap.cps.spi.model.DataNodeBuilder;
48 import org.onap.cps.spi.utils.CpsValidator;
49 import org.onap.cps.utils.ContentType;
50 import org.onap.cps.utils.TimedYangParser;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.springframework.stereotype.Service;
57 @RequiredArgsConstructor
58 public class CpsDataServiceImpl implements CpsDataService {
60 private static final String ROOT_NODE_XPATH = "/";
61 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
63 private final CpsDataPersistenceService cpsDataPersistenceService;
64 private final CpsAdminService cpsAdminService;
65 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
66 private final NotificationService notificationService;
67 private final CpsValidator cpsValidator;
68 private final TimedYangParser timedYangParser;
71 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
72 final OffsetDateTime observedTimestamp) {
73 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
77 @Timed(value = "cps.data.service.datanode.root.save",
78 description = "Time taken to save a root data node")
79 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
80 final OffsetDateTime observedTimestamp, final ContentType contentType) {
81 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
82 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
83 final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
84 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
85 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
89 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
90 final String nodeData, final OffsetDateTime observedTimestamp) {
91 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
95 @Timed(value = "cps.data.service.datanode.child.save",
96 description = "Time taken to save a child data node")
97 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
98 final String nodeData, final OffsetDateTime observedTimestamp,
99 final ContentType contentType) {
100 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
101 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
102 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
103 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
104 processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
108 @Timed(value = "cps.data.service.list.element.save",
109 description = "Time taken to save a list element")
110 public void saveListElements(final String dataspaceName, final String anchorName,
111 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
112 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
113 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
114 final Collection<DataNode> listElementDataNodeCollection =
115 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
116 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
117 listElementDataNodeCollection);
118 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
122 @Timed(value = "cps.data.service.list.element.batch.save",
123 description = "Time taken to save a batch of list elements")
124 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
125 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
126 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
127 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
128 final Collection<Collection<DataNode>> listElementDataNodeCollections =
129 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
130 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131 listElementDataNodeCollections);
132 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
136 @Timed(value = "cps.data.service.datanode.get",
137 description = "Time taken to get data nodes for an xpath")
138 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
140 final FetchDescendantsOption fetchDescendantsOption) {
141 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
142 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
146 @Timed(value = "cps.data.service.datanode.batch.get",
147 description = "Time taken to get a batch of data nodes")
148 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
149 final Collection<String> xpaths,
150 final FetchDescendantsOption fetchDescendantsOption) {
151 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
152 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
153 fetchDescendantsOption);
157 @Timed(value = "cps.data.service.datanode.leaves.update",
158 description = "Time taken to get a batch of data nodes")
159 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
160 final String jsonData, final OffsetDateTime observedTimestamp) {
161 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
162 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
163 final DataNode dataNode = buildDataNode(anchor, parentNodeXpath, jsonData, ContentType.JSON);
164 cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(),
165 dataNode.getLeaves());
166 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
170 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
171 description = "Time taken to update data node leaves and existing descendants leaves")
172 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
173 final String parentNodeXpath,
174 final String dataNodeUpdatesAsJson,
175 final OffsetDateTime observedTimestamp) {
176 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
177 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
178 final Collection<DataNode> dataNodeUpdates =
179 buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
180 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
181 processDataNodeUpdate(anchor, dataNodeUpdate);
183 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
187 public String startSession() {
188 return cpsDataPersistenceService.startSession();
192 public void closeSession(final String sessionId) {
193 cpsDataPersistenceService.closeSession(sessionId);
197 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
198 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
202 public void lockAnchor(final String sessionID, final String dataspaceName,
203 final String anchorName, final Long timeoutInMilliseconds) {
204 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
208 @Timed(value = "cps.data.service.datanode.descendants.update",
209 description = "Time taken to update a data node and descendants")
210 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
211 final String parentNodeXpath, final String jsonData,
212 final OffsetDateTime observedTimestamp) {
213 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
214 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
215 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
216 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
217 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
221 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
222 description = "Time taken to update a batch of data nodes and descendants")
223 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
224 final Map<String, String> nodesJsonData,
225 final OffsetDateTime observedTimestamp) {
226 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
227 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
228 final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
229 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
230 nodesJsonData.keySet().forEach(nodeXpath ->
231 processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
235 @Timed(value = "cps.data.service.list.update",
236 description = "Time taken to update a list")
237 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
238 final String jsonData, final OffsetDateTime observedTimestamp) {
239 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
240 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
241 final Collection<DataNode> newListElements =
242 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
243 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
247 @Timed(value = "cps.data.service.list.batch.update",
248 description = "Time taken to update a batch of lists")
249 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
250 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
251 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
252 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
253 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
254 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
258 @Timed(value = "cps.data.service.datanode.delete",
259 description = "Time taken to delete a datanode")
260 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
261 final OffsetDateTime observedTimestamp) {
262 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
263 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
264 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
265 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
269 @Timed(value = "cps.data.service.datanode.batch.delete",
270 description = "Time taken to delete a batch of datanodes")
271 public void deleteDataNodes(final String dataspaceName, final String anchorName,
272 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
273 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
274 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
275 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
276 dataNodeXpaths.forEach(dataNodeXpath ->
277 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
281 @Timed(value = "cps.data.service.datanode.delete.anchor",
282 description = "Time taken to delete all datanodes for an anchor")
283 public void deleteDataNodes(final String dataspaceName, final String anchorName,
284 final OffsetDateTime observedTimestamp) {
285 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
286 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
287 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
288 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
292 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
293 description = "Time taken to delete all datanodes for multiple anchors")
294 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
295 final OffsetDateTime observedTimestamp) {
296 cpsValidator.validateNameCharacters(dataspaceName);
297 cpsValidator.validateNameCharacters(anchorNames);
298 for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
299 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
301 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
305 @Timed(value = "cps.data.service.list.delete",
306 description = "Time taken to delete a list or list element")
307 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
308 final OffsetDateTime observedTimestamp) {
309 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
310 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
311 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
312 processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
315 private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
316 final ContentType contentType) {
317 final SchemaContext schemaContext = getSchemaContext(anchor);
319 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
320 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
321 return new DataNodeBuilder().withContainerNode(containerNode).build();
324 final ContainerNode containerNode =
325 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
327 return new DataNodeBuilder()
328 .withParentNodeXpath(parentNodeXpath)
329 .withContainerNode(containerNode)
333 private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
334 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
335 buildDataNode(anchor, nodeJsonData.getKey(),
336 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
339 private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
340 final String nodeData, final ContentType contentType) {
341 final SchemaContext schemaContext = getSchemaContext(anchor);
343 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
344 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
345 final Collection<DataNode> dataNodes = new DataNodeBuilder()
346 .withContainerNode(containerNode)
348 if (dataNodes.isEmpty()) {
349 throw new DataValidationException("Invalid data.", "No data nodes provided");
353 final ContainerNode containerNode =
354 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
355 final Collection<DataNode> dataNodes = new DataNodeBuilder()
356 .withParentNodeXpath(parentNodeXpath)
357 .withContainerNode(containerNode)
359 if (dataNodes.isEmpty()) {
360 throw new DataValidationException("Invalid data.", "No data nodes provided");
365 private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
366 final Collection<String> nodeDataList,
367 final ContentType contentType) {
368 return nodeDataList.stream()
369 .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
370 .collect(Collectors.toList());
373 private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
374 final Operation operation, final OffsetDateTime observedTimestamp) {
376 notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
377 } catch (final Exception exception) {
378 //If async message can't be queued for notification service, the initial request should not failed.
379 log.error("Failed to send message to notification service", exception);
383 private SchemaContext getSchemaContext(final Anchor anchor) {
384 return yangTextSchemaSourceSetCache
385 .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
388 private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
389 if (dataNodeUpdate == null) {
392 cpsDataPersistenceService.updateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
393 dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves());
394 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
395 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
396 processDataNodeUpdate(anchor, childDataNodeUpdate);