2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.notification.NotificationService;
43 import org.onap.cps.notification.Operation;
44 import org.onap.cps.spi.CpsDataPersistenceService;
45 import org.onap.cps.spi.FetchDescendantsOption;
46 import org.onap.cps.spi.exceptions.DataValidationException;
47 import org.onap.cps.spi.model.Anchor;
48 import org.onap.cps.spi.model.DataNode;
49 import org.onap.cps.spi.model.DataNodeBuilder;
50 import org.onap.cps.spi.utils.CpsValidator;
51 import org.onap.cps.utils.ContentType;
52 import org.onap.cps.utils.TimedYangParser;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.springframework.stereotype.Service;
59 @RequiredArgsConstructor
60 public class CpsDataServiceImpl implements CpsDataService {
62 private static final String ROOT_NODE_XPATH = "/";
63 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
65 private final CpsDataPersistenceService cpsDataPersistenceService;
66 private final CpsAdminService cpsAdminService;
67 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
68 private final NotificationService notificationService;
69 private final CpsValidator cpsValidator;
70 private final TimedYangParser timedYangParser;
73 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
74 final OffsetDateTime observedTimestamp) {
75 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
79 @Timed(value = "cps.data.service.datanode.root.save",
80 description = "Time taken to save a root data node")
81 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
82 final OffsetDateTime observedTimestamp, final ContentType contentType) {
83 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
84 final Collection<DataNode> dataNodes =
85 buildDataNodes(dataspaceName, anchorName, ROOT_NODE_XPATH, nodeData, contentType);
86 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
87 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, CREATE, observedTimestamp);
91 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
92 final String nodeData, final OffsetDateTime observedTimestamp) {
93 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
97 @Timed(value = "cps.data.service.datanode.child.save",
98 description = "Time taken to save a child data node")
99 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
100 final String nodeData, final OffsetDateTime observedTimestamp,
101 final ContentType contentType) {
102 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
103 final Collection<DataNode> dataNodes =
104 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType);
105 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
106 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, CREATE, observedTimestamp);
110 @Timed(value = "cps.data.service.list.element.save",
111 description = "Time taken to save a list element")
112 public void saveListElements(final String dataspaceName, final String anchorName,
113 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
114 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
115 final Collection<DataNode> listElementDataNodeCollection =
116 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
117 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
118 listElementDataNodeCollection);
119 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
123 @Timed(value = "cps.data.service.list.element.batch.save",
124 description = "Time taken to save a batch of list elements")
125 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
126 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
127 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
128 final Collection<Collection<DataNode>> listElementDataNodeCollections =
129 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonDataList, ContentType.JSON);
130 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131 listElementDataNodeCollections);
132 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
136 @Timed(value = "cps.data.service.datanode.get",
137 description = "Time taken to get a data node")
138 public DataNode getDataNode(final String dataspaceName, final String anchorName, final String xpath,
139 final FetchDescendantsOption fetchDescendantsOption) {
140 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
141 return cpsDataPersistenceService.getDataNode(dataspaceName, anchorName, xpath, fetchDescendantsOption);
145 @Timed(value = "cps.data.service.datanode.batch.get",
146 description = "Time taken to get a batch of data nodes")
147 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
148 final Collection<String> xpaths,
149 final FetchDescendantsOption fetchDescendantsOption) {
150 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
151 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpaths, fetchDescendantsOption);
155 @Timed(value = "cps.data.service.datanode.leaves.update",
156 description = "Time taken to get a batch of data nodes")
157 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
158 final String jsonData, final OffsetDateTime observedTimestamp) {
159 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
160 final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
161 cpsDataPersistenceService
162 .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves());
163 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
167 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
168 description = "Time taken to update data node leaves and existing descendants leaves")
169 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
170 final String parentNodeXpath,
171 final String dataNodeUpdatesAsJson,
172 final OffsetDateTime observedTimestamp) {
173 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
174 final Collection<DataNode> dataNodeUpdates =
175 buildDataNodes(dataspaceName, anchorName,
176 parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
177 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
178 processDataNodeUpdate(dataspaceName, anchorName, dataNodeUpdate);
180 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
184 public String startSession() {
185 return cpsDataPersistenceService.startSession();
189 public void closeSession(final String sessionId) {
190 cpsDataPersistenceService.closeSession(sessionId);
194 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
195 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
199 public void lockAnchor(final String sessionID, final String dataspaceName,
200 final String anchorName, final Long timeoutInMilliseconds) {
201 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
205 @Timed(value = "cps.data.service.datanode.descendants.update",
206 description = "Time taken to update a data node and descendants")
207 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
208 final String parentNodeXpath, final String jsonData,
209 final OffsetDateTime observedTimestamp) {
210 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
211 final Collection<DataNode> dataNodes =
212 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
213 final ArrayList<DataNode> nodes = new ArrayList<>(dataNodes);
214 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, nodes);
215 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
219 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
220 description = "Time taken to update a batch of data nodes and descendants")
221 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
222 final Map<String, String> nodesJsonData,
223 final OffsetDateTime observedTimestamp) {
224 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
225 final List<DataNode> dataNodes = buildDataNodes(dataspaceName, anchorName, nodesJsonData);
226 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
227 nodesJsonData.keySet().forEach(nodeXpath ->
228 processDataUpdatedEventAsync(dataspaceName, anchorName, nodeXpath,
229 UPDATE, observedTimestamp));
233 @Timed(value = "cps.data.service.list.update",
234 description = "Time taken to update a list")
235 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
236 final String jsonData, final OffsetDateTime observedTimestamp) {
237 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
238 final Collection<DataNode> newListElements =
239 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
240 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
244 @Timed(value = "cps.data.service.list.batch.update",
245 description = "Time taken to update a batch of lists")
246 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
247 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
248 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
249 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
250 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
254 @Timed(value = "cps.data.service.datanode.delete",
255 description = "Time taken to delete a datanode")
256 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
257 final OffsetDateTime observedTimestamp) {
258 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
259 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
260 processDataUpdatedEventAsync(dataspaceName, anchorName, dataNodeXpath, DELETE, observedTimestamp);
264 @Timed(value = "cps.data.service.datanode.batch.delete",
265 description = "Time taken to delete a batch of datanodes")
266 public void deleteDataNodes(final String dataspaceName, final String anchorName,
267 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
268 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
269 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
270 dataNodeXpaths.forEach(dataNodeXpath ->
271 processDataUpdatedEventAsync(dataspaceName, anchorName, dataNodeXpath, DELETE, observedTimestamp));
275 @Timed(value = "cps.data.service.datanode.all.delete",
276 description = "Time taken to delete all datanodes")
277 public void deleteDataNodes(final String dataspaceName, final String anchorName,
278 final OffsetDateTime observedTimestamp) {
279 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
280 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, DELETE, observedTimestamp);
281 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
285 @Timed(value = "cps.data.service.list.delete",
286 description = "Time taken to delete a list or list element")
287 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
288 final OffsetDateTime observedTimestamp) {
289 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
290 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
291 processDataUpdatedEventAsync(dataspaceName, anchorName, listNodeXpath, DELETE, observedTimestamp);
294 private DataNode buildDataNode(final String dataspaceName, final String anchorName,
295 final String parentNodeXpath, final String nodeData,
296 final ContentType contentType) {
298 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
299 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
301 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
302 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
303 return new DataNodeBuilder().withContainerNode(containerNode).build();
306 final ContainerNode containerNode =
307 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
309 return new DataNodeBuilder()
310 .withParentNodeXpath(parentNodeXpath)
311 .withContainerNode(containerNode)
315 private List<DataNode> buildDataNodes(final String dataspaceName, final String anchorName,
316 final Map<String, String> nodesJsonData) {
317 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
318 buildDataNode(dataspaceName, anchorName, nodeJsonData.getKey(),
319 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
322 private Collection<DataNode> buildDataNodes(final String dataspaceName,
323 final String anchorName,
324 final String parentNodeXpath,
325 final String nodeData,
326 final ContentType contentType) {
328 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
329 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
331 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
332 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
333 final Collection<DataNode> dataNodes = new DataNodeBuilder()
334 .withContainerNode(containerNode)
336 if (dataNodes.isEmpty()) {
337 throw new DataValidationException("Invalid data.", "No data nodes provided");
341 final ContainerNode containerNode =
342 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
343 final Collection<DataNode> dataNodes = new DataNodeBuilder()
344 .withParentNodeXpath(parentNodeXpath)
345 .withContainerNode(containerNode)
347 if (dataNodes.isEmpty()) {
348 throw new DataValidationException("Invalid data.", "No data nodes provided");
353 private Collection<Collection<DataNode>> buildDataNodes(final String dataspaceName, final String anchorName,
354 final String parentNodeXpath, final Collection<String> nodeDataList, final ContentType contentType) {
355 return nodeDataList.stream()
356 .map(nodeData -> buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType))
357 .collect(Collectors.toList());
360 private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName, final String xpath,
361 final Operation operation, final OffsetDateTime observedTimestamp) {
363 notificationService.processDataUpdatedEvent(dataspaceName, anchorName, xpath, operation, observedTimestamp);
364 } catch (final Exception exception) {
365 //If async message can't be queued for notification service, the initial request should not failed.
366 log.error("Failed to send message to notification service", exception);
370 private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) {
371 return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext();
374 private void processDataNodeUpdate(final String dataspaceName, final String anchorName,
375 final DataNode dataNodeUpdate) {
376 if (dataNodeUpdate == null) {
379 cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName, dataNodeUpdate.getXpath(),
380 dataNodeUpdate.getLeaves());
381 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
382 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
383 processDataNodeUpdate(dataspaceName, anchorName, childDataNodeUpdate);