2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2021-2023 Nordix Foundation
4 * Modifications Copyright (C) 2020-2022 Bell Canada.
5 * Modifications Copyright (C) 2021 Pantheon.tech
6 * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7 * Modifications Copyright (C) 2022 Deutsche Telekom AG
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * SPDX-License-Identifier: Apache-2.0
22 * ============LICENSE_END=========================================================
25 package org.onap.cps.api.impl;
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
31 import io.micrometer.core.annotation.Timed;
32 import java.io.Serializable;
33 import java.time.OffsetDateTime;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.List;
39 import java.util.stream.Collectors;
40 import lombok.RequiredArgsConstructor;
41 import lombok.extern.slf4j.Slf4j;
42 import org.onap.cps.api.CpsAdminService;
43 import org.onap.cps.api.CpsDataService;
44 import org.onap.cps.api.CpsDeltaService;
45 import org.onap.cps.cpspath.parser.CpsPathUtil;
46 import org.onap.cps.notification.NotificationService;
47 import org.onap.cps.notification.Operation;
48 import org.onap.cps.spi.CpsDataPersistenceService;
49 import org.onap.cps.spi.FetchDescendantsOption;
50 import org.onap.cps.spi.exceptions.DataValidationException;
51 import org.onap.cps.spi.model.Anchor;
52 import org.onap.cps.spi.model.DataNode;
53 import org.onap.cps.spi.model.DataNodeBuilder;
54 import org.onap.cps.spi.model.DeltaReport;
55 import org.onap.cps.spi.utils.CpsValidator;
56 import org.onap.cps.utils.ContentType;
57 import org.onap.cps.utils.TimedYangParser;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 import org.springframework.stereotype.Service;
64 @RequiredArgsConstructor
65 public class CpsDataServiceImpl implements CpsDataService {
67 private static final String ROOT_NODE_XPATH = "/";
68 private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
70 private final CpsDataPersistenceService cpsDataPersistenceService;
71 private final CpsAdminService cpsAdminService;
72 private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
73 private final NotificationService notificationService;
74 private final CpsValidator cpsValidator;
75 private final TimedYangParser timedYangParser;
76 private final CpsDeltaService cpsDeltaService;
79 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
80 final OffsetDateTime observedTimestamp) {
81 saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
85 @Timed(value = "cps.data.service.datanode.root.save",
86 description = "Time taken to save a root data node")
87 public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
88 final OffsetDateTime observedTimestamp, final ContentType contentType) {
89 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
90 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
91 final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
92 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
93 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
97 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
98 final String nodeData, final OffsetDateTime observedTimestamp) {
99 saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
103 @Timed(value = "cps.data.service.datanode.child.save",
104 description = "Time taken to save a child data node")
105 public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
106 final String nodeData, final OffsetDateTime observedTimestamp,
107 final ContentType contentType) {
108 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
109 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
110 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
111 cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
112 processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
116 @Timed(value = "cps.data.service.list.element.save",
117 description = "Time taken to save a list element")
118 public void saveListElements(final String dataspaceName, final String anchorName,
119 final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
120 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
121 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
122 final Collection<DataNode> listElementDataNodeCollection =
123 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
124 if (isRootNodeXpath(parentNodeXpath)) {
125 cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, listElementDataNodeCollection);
127 cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
128 listElementDataNodeCollection);
130 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
134 @Timed(value = "cps.data.service.list.element.batch.save",
135 description = "Time taken to save a batch of list elements")
136 public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
137 final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
138 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
139 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
140 final Collection<Collection<DataNode>> listElementDataNodeCollections =
141 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
142 cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
143 listElementDataNodeCollections);
144 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
148 @Timed(value = "cps.data.service.datanode.get",
149 description = "Time taken to get data nodes for an xpath")
150 public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
152 final FetchDescendantsOption fetchDescendantsOption) {
153 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
154 return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
158 @Timed(value = "cps.data.service.datanode.batch.get",
159 description = "Time taken to get a batch of data nodes")
160 public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
161 final Collection<String> xpaths,
162 final FetchDescendantsOption fetchDescendantsOption) {
163 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
164 return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
165 fetchDescendantsOption);
169 @Timed(value = "cps.data.service.datanode.leaves.update",
170 description = "Time taken to update a batch of leaf data nodes")
171 public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
172 final String jsonData, final OffsetDateTime observedTimestamp) {
173 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
174 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
175 final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
177 final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
178 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
179 cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
180 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
184 @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
185 description = "Time taken to update data node leaves and existing descendants leaves")
186 public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
187 final String parentNodeXpath,
188 final String dataNodeUpdatesAsJson,
189 final OffsetDateTime observedTimestamp) {
190 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
191 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
192 final Collection<DataNode> dataNodeUpdates =
193 buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
194 for (final DataNode dataNodeUpdate : dataNodeUpdates) {
195 processDataNodeUpdate(anchor, dataNodeUpdate);
197 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
201 public String startSession() {
202 return cpsDataPersistenceService.startSession();
206 public void closeSession(final String sessionId) {
207 cpsDataPersistenceService.closeSession(sessionId);
211 public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
212 lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
216 public void lockAnchor(final String sessionID, final String dataspaceName,
217 final String anchorName, final Long timeoutInMilliseconds) {
218 cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
222 @Timed(value = "cps.data.service.get.delta",
223 description = "Time taken to get delta between anchors")
224 public List<DeltaReport> getDeltaByDataspaceAndAnchors(final String dataspaceName,
225 final String sourceAnchorName,
226 final String targetAnchorName, final String xpath,
227 final FetchDescendantsOption fetchDescendantsOption) {
229 final Collection<DataNode> sourceDataNodes = getDataNodesForMultipleXpaths(dataspaceName,
230 sourceAnchorName, Collections.singletonList(xpath), fetchDescendantsOption);
231 final Collection<DataNode> targetDataNodes = getDataNodesForMultipleXpaths(dataspaceName,
232 targetAnchorName, Collections.singletonList(xpath), fetchDescendantsOption);
234 return cpsDeltaService.getDeltaReports(sourceDataNodes, targetDataNodes);
238 @Timed(value = "cps.data.service.datanode.descendants.update",
239 description = "Time taken to update a data node and descendants")
240 public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
241 final String parentNodeXpath, final String jsonData,
242 final OffsetDateTime observedTimestamp) {
243 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
244 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
245 final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
246 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
247 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
251 @Timed(value = "cps.data.service.datanode.descendants.batch.update",
252 description = "Time taken to update a batch of data nodes and descendants")
253 public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
254 final Map<String, String> nodesJsonData,
255 final OffsetDateTime observedTimestamp) {
256 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
257 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
258 final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
259 cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
260 nodesJsonData.keySet().forEach(nodeXpath ->
261 processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
265 @Timed(value = "cps.data.service.list.update",
266 description = "Time taken to update a list")
267 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
268 final String jsonData, final OffsetDateTime observedTimestamp) {
269 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
270 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
271 final Collection<DataNode> newListElements =
272 buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
273 replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
277 @Timed(value = "cps.data.service.list.batch.update",
278 description = "Time taken to update a batch of lists")
279 public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
280 final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
281 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
282 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
283 cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
284 processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
288 @Timed(value = "cps.data.service.datanode.delete",
289 description = "Time taken to delete a datanode")
290 public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
291 final OffsetDateTime observedTimestamp) {
292 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
293 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
294 cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
295 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
299 @Timed(value = "cps.data.service.datanode.batch.delete",
300 description = "Time taken to delete a batch of datanodes")
301 public void deleteDataNodes(final String dataspaceName, final String anchorName,
302 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
303 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
304 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
305 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
306 dataNodeXpaths.forEach(dataNodeXpath ->
307 processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
311 @Timed(value = "cps.data.service.datanode.delete.anchor",
312 description = "Time taken to delete all datanodes for an anchor")
313 public void deleteDataNodes(final String dataspaceName, final String anchorName,
314 final OffsetDateTime observedTimestamp) {
315 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
316 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
317 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
318 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
322 @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
323 description = "Time taken to delete all datanodes for multiple anchors")
324 public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
325 final OffsetDateTime observedTimestamp) {
326 cpsValidator.validateNameCharacters(dataspaceName);
327 cpsValidator.validateNameCharacters(anchorNames);
328 for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
329 processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
331 cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
335 @Timed(value = "cps.data.service.list.delete",
336 description = "Time taken to delete a list or list element")
337 public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
338 final OffsetDateTime observedTimestamp) {
339 cpsValidator.validateNameCharacters(dataspaceName, anchorName);
340 final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
341 cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
342 processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
345 private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
346 final Collection<DataNode> dataNodes = new ArrayList<>();
347 for (final Map.Entry<String, String> nodeJsonData : nodesJsonData.entrySet()) {
348 dataNodes.addAll(buildDataNodes(anchor, nodeJsonData.getKey(), nodeJsonData.getValue(), ContentType.JSON));
353 private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
354 final String nodeData, final ContentType contentType) {
355 final SchemaContext schemaContext = getSchemaContext(anchor);
357 if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
358 final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
359 final Collection<DataNode> dataNodes = new DataNodeBuilder()
360 .withContainerNode(containerNode)
362 if (dataNodes.isEmpty()) {
363 throw new DataValidationException("No data nodes.", "No data nodes provided");
367 final String normalizedParentNodeXpath = CpsPathUtil.getNormalizedXpath(parentNodeXpath);
368 final ContainerNode containerNode =
369 timedYangParser.parseData(contentType, nodeData, schemaContext, normalizedParentNodeXpath);
370 final Collection<DataNode> dataNodes = new DataNodeBuilder()
371 .withParentNodeXpath(normalizedParentNodeXpath)
372 .withContainerNode(containerNode)
374 if (dataNodes.isEmpty()) {
375 throw new DataValidationException("No data nodes.", "No data nodes provided");
380 private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
381 final Collection<String> nodeDataList,
382 final ContentType contentType) {
383 return nodeDataList.stream()
384 .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
385 .collect(Collectors.toList());
388 private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
389 final Operation operation, final OffsetDateTime observedTimestamp) {
391 notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
392 } catch (final Exception exception) {
393 //If async message can't be queued for notification service, the initial request should not fail.
394 log.error("Failed to send message to notification service", exception);
398 private SchemaContext getSchemaContext(final Anchor anchor) {
399 return yangTextSchemaSourceSetCache
400 .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
403 private static boolean isRootNodeXpath(final String xpath) {
404 return ROOT_NODE_XPATH.equals(xpath);
407 private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
408 cpsDataPersistenceService.batchUpdateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
409 Collections.singletonMap(dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves()));
410 final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
411 for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
412 processDataNodeUpdate(anchor, childDataNodeUpdate);