2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.notification.NotificationService;
43 import org.onap.cps.notification.Operation;
44 import org.onap.cps.spi.CpsDataPersistenceService;
45 import org.onap.cps.spi.FetchDescendantsOption;
46 import org.onap.cps.spi.exceptions.DataValidationException;
47 import org.onap.cps.spi.model.Anchor;
48 import org.onap.cps.spi.model.DataNode;
49 import org.onap.cps.spi.model.DataNodeBuilder;
50 import org.onap.cps.spi.utils.CpsValidator;
51 import org.onap.cps.utils.ContentType;
52 import org.onap.cps.utils.TimedYangParser;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.springframework.stereotype.Service;
59 @RequiredArgsConstructor
60 public class CpsDataServiceImpl implements CpsDataService {
62 private static final String ROOT_NODE_XPATH = "/";
63 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
65 private final CpsDataPersistenceService cpsDataPersistenceService;
66 private final CpsAdminService cpsAdminService;
67 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
68 private final NotificationService notificationService;
69 private final CpsValidator cpsValidator;
70 private final TimedYangParser timedYangParser;
73 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
74 final OffsetDateTime observedTimestamp) {
75 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
79 @Timed(value = "cps.data.service.datanode.root.save",
80 description = "Time taken to save a root data node")
81 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
82 final OffsetDateTime observedTimestamp, final ContentType contentType) {
83 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
84 final Collection<DataNode> dataNodes =
85 buildDataNodes(dataspaceName, anchorName, ROOT_NODE_XPATH, nodeData, contentType);
86 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
87 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, CREATE, observedTimestamp);
91 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
92 final String nodeData, final OffsetDateTime observedTimestamp) {
93 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
97 @Timed(value = "cps.data.service.datanode.child.save",
98 description = "Time taken to save a child data node")
99 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
100 final String nodeData, final OffsetDateTime observedTimestamp,
101 final ContentType contentType) {
102 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
103 final Collection<DataNode> dataNodes =
104 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType);
105 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
106 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, CREATE, observedTimestamp);
110 @Timed(value = "cps.data.service.list.element.save",
111 description = "Time taken to save a list element")
112 public void saveListElements(final String dataspaceName, final String anchorName,
113 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
114 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
115 final Collection<DataNode> listElementDataNodeCollection =
116 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
117 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
118 listElementDataNodeCollection);
119 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
123 @Timed(value = "cps.data.service.list.element.batch.save",
124 description = "Time taken to save a batch of list elements")
125 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
126 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
127 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
128 final Collection<Collection<DataNode>> listElementDataNodeCollections =
129 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonDataList, ContentType.JSON);
130 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131 listElementDataNodeCollections);
132 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
136 @Timed(value = "cps.data.service.datanode.get",
137 description = "Time taken to get data nodes for an xpath")
138 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
140 final FetchDescendantsOption fetchDescendantsOption) {
141 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
142 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
146 @Timed(value = "cps.data.service.datanode.batch.get",
147 description = "Time taken to get a batch of data nodes")
148 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
149 final Collection<String> xpaths,
150 final FetchDescendantsOption fetchDescendantsOption) {
151 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
152 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
153 fetchDescendantsOption);
157 @Timed(value = "cps.data.service.datanode.leaves.update",
158 description = "Time taken to get a batch of data nodes")
159 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
160 final String jsonData, final OffsetDateTime observedTimestamp) {
161 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
162 final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
163 cpsDataPersistenceService
164 .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves());
165 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
169 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
170 description = "Time taken to update data node leaves and existing descendants leaves")
171 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
172 final String parentNodeXpath,
173 final String dataNodeUpdatesAsJson,
174 final OffsetDateTime observedTimestamp) {
175 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
176 final Collection<DataNode> dataNodeUpdates =
177 buildDataNodes(dataspaceName, anchorName,
178 parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
179 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
180 processDataNodeUpdate(dataspaceName, anchorName, dataNodeUpdate);
182 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
186 public String startSession() {
187 return cpsDataPersistenceService.startSession();
191 public void closeSession(final String sessionId) {
192 cpsDataPersistenceService.closeSession(sessionId);
196 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
197 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
201 public void lockAnchor(final String sessionID, final String dataspaceName,
202 final String anchorName, final Long timeoutInMilliseconds) {
203 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
207 @Timed(value = "cps.data.service.datanode.descendants.update",
208 description = "Time taken to update a data node and descendants")
209 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
210 final String parentNodeXpath, final String jsonData,
211 final OffsetDateTime observedTimestamp) {
212 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
213 final Collection<DataNode> dataNodes =
214 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
215 final ArrayList<DataNode> nodes = new ArrayList<>(dataNodes);
216 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, nodes);
217 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
221 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
222 description = "Time taken to update a batch of data nodes and descendants")
223 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
224 final Map<String, String> nodesJsonData,
225 final OffsetDateTime observedTimestamp) {
226 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
227 final List<DataNode> dataNodes = buildDataNodes(dataspaceName, anchorName, nodesJsonData);
228 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
229 nodesJsonData.keySet().forEach(nodeXpath ->
230 processDataUpdatedEventAsync(dataspaceName, anchorName, nodeXpath,
231 UPDATE, observedTimestamp));
235 @Timed(value = "cps.data.service.list.update",
236 description = "Time taken to update a list")
237 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
238 final String jsonData, final OffsetDateTime observedTimestamp) {
239 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
240 final Collection<DataNode> newListElements =
241 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
242 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
246 @Timed(value = "cps.data.service.list.batch.update",
247 description = "Time taken to update a batch of lists")
248 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
249 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
250 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
251 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
252 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
256 @Timed(value = "cps.data.service.datanode.delete",
257 description = "Time taken to delete a datanode")
258 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
259 final OffsetDateTime observedTimestamp) {
260 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
261 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
262 processDataUpdatedEventAsync(dataspaceName, anchorName, dataNodeXpath, DELETE, observedTimestamp);
266 @Timed(value = "cps.data.service.datanode.batch.delete",
267 description = "Time taken to delete a batch of datanodes")
268 public void deleteDataNodes(final String dataspaceName, final String anchorName,
269 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
270 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
271 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
272 dataNodeXpaths.forEach(dataNodeXpath ->
273 processDataUpdatedEventAsync(dataspaceName, anchorName, dataNodeXpath, DELETE, observedTimestamp));
277 @Timed(value = "cps.data.service.datanode.delete.anchor",
278 description = "Time taken to delete all datanodes for an anchor")
279 public void deleteDataNodes(final String dataspaceName, final String anchorName,
280 final OffsetDateTime observedTimestamp) {
281 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
282 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, DELETE, observedTimestamp);
283 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
287 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
288 description = "Time taken to delete all datanodes for multiple anchors")
289 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
290 final OffsetDateTime observedTimestamp) {
291 cpsValidator.validateNameCharacters(dataspaceName);
292 cpsValidator.validateNameCharacters(anchorNames);
293 for (final String anchorName : anchorNames) {
294 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, DELETE, observedTimestamp);
296 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
300 @Timed(value = "cps.data.service.list.delete",
301 description = "Time taken to delete a list or list element")
302 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
303 final OffsetDateTime observedTimestamp) {
304 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
305 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
306 processDataUpdatedEventAsync(dataspaceName, anchorName, listNodeXpath, DELETE, observedTimestamp);
309 private DataNode buildDataNode(final String dataspaceName, final String anchorName,
310 final String parentNodeXpath, final String nodeData,
311 final ContentType contentType) {
313 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
314 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
316 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
317 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
318 return new DataNodeBuilder().withContainerNode(containerNode).build();
321 final ContainerNode containerNode =
322 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
324 return new DataNodeBuilder()
325 .withParentNodeXpath(parentNodeXpath)
326 .withContainerNode(containerNode)
330 private List<DataNode> buildDataNodes(final String dataspaceName, final String anchorName,
331 final Map<String, String> nodesJsonData) {
332 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
333 buildDataNode(dataspaceName, anchorName, nodeJsonData.getKey(),
334 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
337 private Collection<DataNode> buildDataNodes(final String dataspaceName,
338 final String anchorName,
339 final String parentNodeXpath,
340 final String nodeData,
341 final ContentType contentType) {
343 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
344 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
346 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
347 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
348 final Collection<DataNode> dataNodes = new DataNodeBuilder()
349 .withContainerNode(containerNode)
351 if (dataNodes.isEmpty()) {
352 throw new DataValidationException("Invalid data.", "No data nodes provided");
356 final ContainerNode containerNode =
357 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
358 final Collection<DataNode> dataNodes = new DataNodeBuilder()
359 .withParentNodeXpath(parentNodeXpath)
360 .withContainerNode(containerNode)
362 if (dataNodes.isEmpty()) {
363 throw new DataValidationException("Invalid data.", "No data nodes provided");
368 private Collection<Collection<DataNode>> buildDataNodes(final String dataspaceName, final String anchorName,
369 final String parentNodeXpath, final Collection<String> nodeDataList, final ContentType contentType) {
370 return nodeDataList.stream()
371 .map(nodeData -> buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType))
372 .collect(Collectors.toList());
375 private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName, final String xpath,
376 final Operation operation, final OffsetDateTime observedTimestamp) {
378 notificationService.processDataUpdatedEvent(dataspaceName, anchorName, xpath, operation, observedTimestamp);
379 } catch (final Exception exception) {
380 //If async message can't be queued for notification service, the initial request should not failed.
381 log.error("Failed to send message to notification service", exception);
385 private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) {
386 return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext();
389 private void processDataNodeUpdate(final String dataspaceName, final String anchorName,
390 final DataNode dataNodeUpdate) {
391 if (dataNodeUpdate == null) {
394 cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName, dataNodeUpdate.getXpath(),
395 dataNodeUpdate.getLeaves());
396 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
397 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
398 processDataNodeUpdate(dataspaceName, anchorName, childDataNodeUpdate);