2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
37 import java.util.stream.Collectors;
38 import lombok.RequiredArgsConstructor;
39 import lombok.extern.slf4j.Slf4j;
40 import org.onap.cps.api.CpsAdminService;
41 import org.onap.cps.api.CpsDataService;
42 import org.onap.cps.notification.NotificationService;
43 import org.onap.cps.notification.Operation;
44 import org.onap.cps.spi.CpsDataPersistenceService;
45 import org.onap.cps.spi.FetchDescendantsOption;
46 import org.onap.cps.spi.exceptions.DataValidationException;
47 import org.onap.cps.spi.model.Anchor;
48 import org.onap.cps.spi.model.DataNode;
49 import org.onap.cps.spi.model.DataNodeBuilder;
50 import org.onap.cps.spi.utils.CpsValidator;
51 import org.onap.cps.utils.ContentType;
52 import org.onap.cps.utils.TimedYangParser;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.springframework.stereotype.Service;
59 @RequiredArgsConstructor
60 public class CpsDataServiceImpl implements CpsDataService {
62 private static final String ROOT_NODE_XPATH = "/";
63 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
65 private final CpsDataPersistenceService cpsDataPersistenceService;
66 private final CpsAdminService cpsAdminService;
67 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
68 private final NotificationService notificationService;
69 private final CpsValidator cpsValidator;
70 private final TimedYangParser timedYangParser;
73 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
74 final OffsetDateTime observedTimestamp) {
75 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
79 @Timed(value = "cps.data.service.datanode.root.save",
80 description = "Time taken to save a root data node")
81 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
82 final OffsetDateTime observedTimestamp, final ContentType contentType) {
83 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
84 final Collection<DataNode> dataNodes =
85 buildDataNodes(dataspaceName, anchorName, ROOT_NODE_XPATH, nodeData, contentType);
86 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
87 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, CREATE, observedTimestamp);
91 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
92 final String nodeData, final OffsetDateTime observedTimestamp) {
93 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
97 @Timed(value = "cps.data.service.datanode.child.save",
98 description = "Time taken to save a child data node")
99 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
100 final String nodeData, final OffsetDateTime observedTimestamp,
101 final ContentType contentType) {
102 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
103 final Collection<DataNode> dataNodes =
104 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType);
105 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
106 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, CREATE, observedTimestamp);
110 @Timed(value = "cps.data.service.list.element.save",
111 description = "Time taken to save a list element")
112 public void saveListElements(final String dataspaceName, final String anchorName,
113 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
114 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
115 final Collection<DataNode> listElementDataNodeCollection =
116 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
117 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
118 listElementDataNodeCollection);
119 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
123 @Timed(value = "cps.data.service.list.element.batch.save",
124 description = "Time taken to save a batch of list elements")
125 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
126 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
127 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
128 final Collection<Collection<DataNode>> listElementDataNodeCollections =
129 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonDataList, ContentType.JSON);
130 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131 listElementDataNodeCollections);
132 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
136 @Timed(value = "cps.data.service.datanode.get",
137 description = "Time taken to get data nodes for an xpath")
138 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
140 final FetchDescendantsOption fetchDescendantsOption) {
141 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
142 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
146 @Timed(value = "cps.data.service.datanode.batch.get",
147 description = "Time taken to get a batch of data nodes")
148 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
149 final Collection<String> xpaths,
150 final FetchDescendantsOption fetchDescendantsOption) {
151 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
152 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
153 fetchDescendantsOption);
157 @Timed(value = "cps.data.service.datanode.leaves.update",
158 description = "Time taken to get a batch of data nodes")
159 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
160 final String jsonData, final OffsetDateTime observedTimestamp) {
161 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
162 final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
163 cpsDataPersistenceService
164 .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves());
165 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
169 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
170 description = "Time taken to update data node leaves and existing descendants leaves")
171 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
172 final String parentNodeXpath,
173 final String dataNodeUpdatesAsJson,
174 final OffsetDateTime observedTimestamp) {
175 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
176 final Collection<DataNode> dataNodeUpdates =
177 buildDataNodes(dataspaceName, anchorName,
178 parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
179 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
180 processDataNodeUpdate(dataspaceName, anchorName, dataNodeUpdate);
182 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
186 public String startSession() {
187 return cpsDataPersistenceService.startSession();
191 public void closeSession(final String sessionId) {
192 cpsDataPersistenceService.closeSession(sessionId);
196 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
197 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
201 public void lockAnchor(final String sessionID, final String dataspaceName,
202 final String anchorName, final Long timeoutInMilliseconds) {
203 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
207 @Timed(value = "cps.data.service.datanode.descendants.update",
208 description = "Time taken to update a data node and descendants")
209 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
210 final String parentNodeXpath, final String jsonData,
211 final OffsetDateTime observedTimestamp) {
212 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
213 final Collection<DataNode> dataNodes =
214 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
215 final ArrayList<DataNode> nodes = new ArrayList<>(dataNodes);
216 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, nodes);
217 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
221 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
222 description = "Time taken to update a batch of data nodes and descendants")
223 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
224 final Map<String, String> nodesJsonData,
225 final OffsetDateTime observedTimestamp) {
226 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
227 final List<DataNode> dataNodes = buildDataNodes(dataspaceName, anchorName, nodesJsonData);
228 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
229 nodesJsonData.keySet().forEach(nodeXpath ->
230 processDataUpdatedEventAsync(dataspaceName, anchorName, nodeXpath,
231 UPDATE, observedTimestamp));
235 @Timed(value = "cps.data.service.list.update",
236 description = "Time taken to update a list")
237 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
238 final String jsonData, final OffsetDateTime observedTimestamp) {
239 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
240 final Collection<DataNode> newListElements =
241 buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData, ContentType.JSON);
242 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
246 @Timed(value = "cps.data.service.list.batch.update",
247 description = "Time taken to update a batch of lists")
248 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
249 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
250 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
251 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
252 processDataUpdatedEventAsync(dataspaceName, anchorName, parentNodeXpath, UPDATE, observedTimestamp);
256 @Timed(value = "cps.data.service.datanode.delete",
257 description = "Time taken to delete a datanode")
258 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
259 final OffsetDateTime observedTimestamp) {
260 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
261 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
262 processDataUpdatedEventAsync(dataspaceName, anchorName, dataNodeXpath, DELETE, observedTimestamp);
266 @Timed(value = "cps.data.service.datanode.batch.delete",
267 description = "Time taken to delete a batch of datanodes")
268 public void deleteDataNodes(final String dataspaceName, final String anchorName,
269 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
270 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
271 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
272 dataNodeXpaths.forEach(dataNodeXpath ->
273 processDataUpdatedEventAsync(dataspaceName, anchorName, dataNodeXpath, DELETE, observedTimestamp));
277 @Timed(value = "cps.data.service.datanode.all.delete",
278 description = "Time taken to delete all datanodes")
279 public void deleteDataNodes(final String dataspaceName, final String anchorName,
280 final OffsetDateTime observedTimestamp) {
281 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
282 processDataUpdatedEventAsync(dataspaceName, anchorName, ROOT_NODE_XPATH, DELETE, observedTimestamp);
283 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
287 @Timed(value = "cps.data.service.list.delete",
288 description = "Time taken to delete a list or list element")
289 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
290 final OffsetDateTime observedTimestamp) {
291 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
292 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
293 processDataUpdatedEventAsync(dataspaceName, anchorName, listNodeXpath, DELETE, observedTimestamp);
296 private DataNode buildDataNode(final String dataspaceName, final String anchorName,
297 final String parentNodeXpath, final String nodeData,
298 final ContentType contentType) {
300 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
301 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
303 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
304 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
305 return new DataNodeBuilder().withContainerNode(containerNode).build();
308 final ContainerNode containerNode =
309 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
311 return new DataNodeBuilder()
312 .withParentNodeXpath(parentNodeXpath)
313 .withContainerNode(containerNode)
317 private List<DataNode> buildDataNodes(final String dataspaceName, final String anchorName,
318 final Map<String, String> nodesJsonData) {
319 return nodesJsonData.entrySet().stream().map(nodeJsonData ->
320 buildDataNode(dataspaceName, anchorName, nodeJsonData.getKey(),
321 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
324 private Collection<DataNode> buildDataNodes(final String dataspaceName,
325 final String anchorName,
326 final String parentNodeXpath,
327 final String nodeData,
328 final ContentType contentType) {
330 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
331 final SchemaContext schemaContext = getSchemaContext(dataspaceName, anchor.getSchemaSetName());
333 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
334 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
335 final Collection<DataNode> dataNodes = new DataNodeBuilder()
336 .withContainerNode(containerNode)
338 if (dataNodes.isEmpty()) {
339 throw new DataValidationException("Invalid data.", "No data nodes provided");
343 final ContainerNode containerNode =
344 timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
345 final Collection<DataNode> dataNodes = new DataNodeBuilder()
346 .withParentNodeXpath(parentNodeXpath)
347 .withContainerNode(containerNode)
349 if (dataNodes.isEmpty()) {
350 throw new DataValidationException("Invalid data.", "No data nodes provided");
355 private Collection<Collection<DataNode>> buildDataNodes(final String dataspaceName, final String anchorName,
356 final String parentNodeXpath, final Collection<String> nodeDataList, final ContentType contentType) {
357 return nodeDataList.stream()
358 .map(nodeData -> buildDataNodes(dataspaceName, anchorName, parentNodeXpath, nodeData, contentType))
359 .collect(Collectors.toList());
362 private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName, final String xpath,
363 final Operation operation, final OffsetDateTime observedTimestamp) {
365 notificationService.processDataUpdatedEvent(dataspaceName, anchorName, xpath, operation, observedTimestamp);
366 } catch (final Exception exception) {
367 //If async message can't be queued for notification service, the initial request should not failed.
368 log.error("Failed to send message to notification service", exception);
372 private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) {
373 return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext();
376 private void processDataNodeUpdate(final String dataspaceName, final String anchorName,
377 final DataNode dataNodeUpdate) {
378 if (dataNodeUpdate == null) {
381 cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName, dataNodeUpdate.getXpath(),
382 dataNodeUpdate.getLeaves());
383 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
384 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
385 processDataNodeUpdate(dataspaceName, anchorName, childDataNodeUpdate);