2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.notification.NotificationService;
43 import org.onap.cps.notification.Operation;
44 import org.onap.cps.spi.CpsDataPersistenceService;
45 import org.onap.cps.spi.FetchDescendantsOption;
46 import org.onap.cps.spi.exceptions.DataValidationException;
47 import org.onap.cps.spi.model.Anchor;
48 import org.onap.cps.spi.model.DataNode;
49 import org.onap.cps.spi.model.DataNodeBuilder;
50 import org.onap.cps.spi.utils.CpsValidator;
51 import org.onap.cps.utils.ContentType;
52 import org.onap.cps.utils.TimedYangParser;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.springframework.stereotype.Service;
59 @RequiredArgsConstructor
60 public class CpsDataServiceImpl implements CpsDataService {
62 private static final String ROOT_NODE_XPATH = "/";
63 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
65 private final CpsDataPersistenceService cpsDataPersistenceService;
66 private final CpsAdminService cpsAdminService;
67 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
68 private final NotificationService notificationService;
69 private final CpsValidator cpsValidator;
70 private final TimedYangParser timedYangParser;
73 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
74 final OffsetDateTime observedTimestamp) {
75 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
79 @Timed(value = "cps.data.service.datanode.root.save",
80 description = "Time taken to save a root data node")
81 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
82 final OffsetDateTime observedTimestamp, final ContentType contentType) {
83 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
84 final Collection<DataNode> dataNodes =
85 buildDataNodes(dataspaceName, anchorName, ROOT_NODE_XPATH, nodeData, contentType);
86 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
87 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, CREATE, observedTimestamp);
91 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
92 final String nodeData, final OffsetDateTime observedTimestamp) {
93 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
97 @Timed(value = "cps.data.service.datanode.child.save",
98 description = "Time taken to save a child data node")
99 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
100 final String nodeData, final OffsetDateTime observedTimestamp,
101 final ContentType contentType) {
102 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
103 final Collection<DataNode> dataNodes =
104 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType);
105 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
106 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, CREATE, observedTimestamp);
110 @Timed(value = "cps.data.service.list.element.save",
111 description = "Time taken to save a list element")
112 public void saveListElements(final String dataspaceName, final String anchorName,
113 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
114 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
115 final Collection<DataNode> listElementDataNodeCollection =
116 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
117 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
118 listElementDataNodeCollection);
119 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
123 @Timed(value = "cps.data.service.list.element.batch.save",
124 description = "Time taken to save a batch of list elements")
125 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
126 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
127 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
128 final Collection<Collection<DataNode>> listElementDataNodeCollections =
129 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonDataList, ContentType.JSON);
130 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131 listElementDataNodeCollections);
132 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
136 @Timed(value = "cps.data.service.datanode.get",
137 description = "Time taken to get a data node")
138 public DataNode getDataNode(final String dataspaceName, final String anchorName, final String xpath,
139 final FetchDescendantsOption fetchDescendantsOption) {
140 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
141 return cpsDataPersistenceService.getDataNode(dataspaceName, anchorName, xpath, fetchDescendantsOption);
145 @Timed(value = "cps.data.service.datanode.batch.get",
146 description = "Time taken to get a batch of data nodes")
147 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
148 final Collection<String> xpaths,
149 final FetchDescendantsOption fetchDescendantsOption) {
150 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
151 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpaths, fetchDescendantsOption);
155 @Timed(value = "cps.data.service.datanode.leaves.update",
156 description = "Time taken to get a batch of data nodes")
157 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
158 final String jsonData, final OffsetDateTime observedTimestamp) {
159 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
160 final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
161 cpsDataPersistenceService
162 .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves());
163 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
167 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
168 description = "Time taken to update data node leaves and existing descendants leaves")
169 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
170 final String parentNodeXpath,
171 final String dataNodeUpdatesAsJson,
172 final OffsetDateTime observedTimestamp) {
173 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
174 final Collection<DataNode> dataNodeUpdates =
175 buildDataNodes(dataspaceName, anchorName,
176 parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
177 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
178 processDataNodeUpdate(dataspaceName, anchorName, dataNodeUpdate);
180 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
184 public String startSession() {
185 return cpsDataPersistenceService.startSession();
189 public void closeSession(final String sessionId) {
190 cpsDataPersistenceService.closeSession(sessionId);
194 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
195 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
199 public void lockAnchor(final String sessionID, final String dataspaceName,
200 final String anchorName, final Long timeoutInMilliseconds) {
201 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
205 @Timed(value = "cps.data.service.datanode.descendants.update",
206 description = "Time taken to update a data node and descendants")
207 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
208 final String parentNodeXpath, final String jsonData,
209 final OffsetDateTime observedTimestamp) {
210 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
211 final Collection<DataNode> dataNodes =
212 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
213 final ArrayList<DataNode> nodes = new ArrayList<>(dataNodes);
214 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, nodes);
215 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
219 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
220 description = "Time taken to update a batch of data nodes and descendants")
221 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
222 final Map<String, String> nodesJsonData,
223 final OffsetDateTime observedTimestamp) {
224 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
225 final List<DataNode> dataNodes = buildDataNodes(dataspaceName, anchorName, nodesJsonData);
226 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
227 nodesJsonData.keySet().forEach(nodeXpath ->
228 processDataUpdatedEventAsync(dataspaceName, anchorName, nodeXpath,
229 UPDATE, observedTimestamp));
233 @Timed(value = "cps.data.service.list.update",
234 description = "Time taken to update a list")
235 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
236 final String jsonData, final OffsetDateTime observedTimestamp) {
237 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
238 final Collection<DataNode> newListElements =
239 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
240 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
244 @Timed(value = "cps.data.service.list.batch.update",
245 description = "Time taken to update a batch of lists")
246 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
247 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
248 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
249 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
250 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
254 @Timed(value = "cps.data.service.datanode.delete",
255 description = "Time taken to delete a datanode")
256 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
257 final OffsetDateTime observedTimestamp) {
258 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
259 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
260 processDataUpdatedEventAsync(dataspaceName, anchorName, dataNodeXpath, DELETE, observedTimestamp);
264 @Timed(value = "cps.data.service.datanode.batch.delete",
265 description = "Time taken to delete a batch of datanodes")
266 public void deleteDataNodes(final String dataspaceName, final String anchorName,
267 final OffsetDateTime observedTimestamp) {
268 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
269 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, DELETE, observedTimestamp);
270 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
274 @Timed(value = "cps.data.service.list.delete",
275 description = "Time taken to delete a list or list element")
276 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
277 final OffsetDateTime observedTimestamp) {
278 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
279 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
280 processDataUpdatedEventAsync(dataspaceName, anchorName, listNodeXpath, DELETE, observedTimestamp);
283 private DataNode buildDataNode(final String dataspaceName, final String anchorName,
284 final String parentNodeXpath, final String nodeData,
285 final ContentType contentType) {
287 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
288 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
290 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
291 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
292 return new DataNodeBuilder().withContainerNode(containerNode).build();
295 final ContainerNode containerNode =
296 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
298 return new DataNodeBuilder()
299 .withParentNodeXpath(parentNodeXpath)
300 .withContainerNode(containerNode)
304 private List<DataNode> buildDataNodes(final String dataspaceName, final String anchorName,
305 final Map<String, String> nodesJsonData) {
306 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
307 buildDataNode(dataspaceName, anchorName, nodeJsonData.getKey(),
308 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
311 private Collection<DataNode> buildDataNodes(final String dataspaceName,
312 final String anchorName,
313 final String parentNodeXpath,
314 final String nodeData,
315 final ContentType contentType) {
317 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
318 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
320 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
321 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
322 final Collection<DataNode> dataNodes = new DataNodeBuilder()
323 .withContainerNode(containerNode)
325 if (dataNodes.isEmpty()) {
326 throw new DataValidationException("Invalid data.", "No data nodes provided");
330 final ContainerNode containerNode =
331 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
332 final Collection<DataNode> dataNodes = new DataNodeBuilder()
333 .withParentNodeXpath(parentNodeXpath)
334 .withContainerNode(containerNode)
336 if (dataNodes.isEmpty()) {
337 throw new DataValidationException("Invalid data.", "No data nodes provided");
342 private Collection<Collection<DataNode>> buildDataNodes(final String dataspaceName, final String anchorName,
343 final String parentNodeXpath, final Collection<String> nodeDataList, final ContentType contentType) {
344 return nodeDataList.stream()
345 .map(nodeData -> buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType))
346 .collect(Collectors.toList());
349 private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName, final String xpath,
350 final Operation operation, final OffsetDateTime observedTimestamp) {
352 notificationService.processDataUpdatedEvent(dataspaceName, anchorName, xpath, operation, observedTimestamp);
353 } catch (final Exception exception) {
354 //If async message can't be queued for notification service, the initial request should not failed.
355 log.error("Failed to send message to notification service", exception);
359 private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) {
360 return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext();
363 private void processDataNodeUpdate(final String dataspaceName, final String anchorName,
364 final DataNode dataNodeUpdate) {
365 if (dataNodeUpdate == null) {
368 cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName, dataNodeUpdate.getXpath(),
369 dataNodeUpdate.getLeaves());
370 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
371 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
372 processDataNodeUpdate(dataspaceName, anchorName, childDataNodeUpdate);