DMI Data AVC to use kafka headers
[cps.git] / cps-service / src / main / java / org / onap / cps / api / impl / CpsDataServiceImpl.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2021-2023 Nordix Foundation
4  *  Modifications Copyright (C) 2020-2022 Bell Canada.
5  *  Modifications Copyright (C) 2021 Pantheon.tech
6  *  Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
7  *  Modifications Copyright (C) 2022 Deutsche Telekom AG
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *
13  *        http://www.apache.org/licenses/LICENSE-2.0
14  *
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *
21  *  SPDX-License-Identifier: Apache-2.0
22  *  ============LICENSE_END=========================================================
23  */
24
25 package org.onap.cps.api.impl;
26
27 import static org.onap.cps.notification.Operation.CREATE;
28 import static org.onap.cps.notification.Operation.DELETE;
29 import static org.onap.cps.notification.Operation.UPDATE;
30
31 import io.micrometer.core.annotation.Timed;
32 import java.time.OffsetDateTime;
33 import java.util.Collection;
34 import java.util.Map;
35 import java.util.stream.Collectors;
36 import lombok.RequiredArgsConstructor;
37 import lombok.extern.slf4j.Slf4j;
38 import org.onap.cps.api.CpsAdminService;
39 import org.onap.cps.api.CpsDataService;
40 import org.onap.cps.notification.NotificationService;
41 import org.onap.cps.notification.Operation;
42 import org.onap.cps.spi.CpsDataPersistenceService;
43 import org.onap.cps.spi.FetchDescendantsOption;
44 import org.onap.cps.spi.exceptions.DataValidationException;
45 import org.onap.cps.spi.model.Anchor;
46 import org.onap.cps.spi.model.DataNode;
47 import org.onap.cps.spi.model.DataNodeBuilder;
48 import org.onap.cps.spi.utils.CpsValidator;
49 import org.onap.cps.utils.ContentType;
50 import org.onap.cps.utils.TimedYangParser;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.springframework.stereotype.Service;
54
55 @Service
56 @Slf4j
57 @RequiredArgsConstructor
58 public class CpsDataServiceImpl implements CpsDataService {
59
60     private static final String ROOT_NODE_XPATH = "/";
61     private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
62
63     private final CpsDataPersistenceService cpsDataPersistenceService;
64     private final CpsAdminService cpsAdminService;
65     private final YangTextSchemaSourceSetCache yangTextSchemaSourceSetCache;
66     private final NotificationService notificationService;
67     private final CpsValidator cpsValidator;
68     private final TimedYangParser timedYangParser;
69
70     @Override
71     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
72         final OffsetDateTime observedTimestamp) {
73         saveData(dataspaceName, anchorName, nodeData, observedTimestamp, ContentType.JSON);
74     }
75
76     @Override
77     @Timed(value = "cps.data.service.datanode.root.save",
78         description = "Time taken to save a root data node")
79     public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
80                          final OffsetDateTime observedTimestamp, final ContentType contentType) {
81         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
82         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
83         final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
84         cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
85         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, CREATE, observedTimestamp);
86     }
87
88     @Override
89     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
90                          final String nodeData, final OffsetDateTime observedTimestamp) {
91         saveData(dataspaceName, anchorName, parentNodeXpath, nodeData, observedTimestamp, ContentType.JSON);
92     }
93
94     @Override
95     @Timed(value = "cps.data.service.datanode.child.save",
96         description = "Time taken to save a child data node")
97     public void saveData(final String dataspaceName, final String anchorName, final String parentNodeXpath,
98                          final String nodeData, final OffsetDateTime observedTimestamp,
99                          final ContentType contentType) {
100         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
101         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
102         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
103         cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
104         processDataUpdatedEventAsync(anchor, parentNodeXpath, CREATE, observedTimestamp);
105     }
106
107     @Override
108     @Timed(value = "cps.data.service.list.element.save",
109         description = "Time taken to save a list element")
110     public void saveListElements(final String dataspaceName, final String anchorName,
111         final String parentNodeXpath, final String jsonData, final OffsetDateTime observedTimestamp) {
112         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
113         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
114         final Collection<DataNode> listElementDataNodeCollection =
115             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
116         cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
117             listElementDataNodeCollection);
118         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
119     }
120
121     @Override
122     @Timed(value = "cps.data.service.list.element.batch.save",
123         description = "Time taken to save a batch of list elements")
124     public void saveListElementsBatch(final String dataspaceName, final String anchorName, final String parentNodeXpath,
125             final Collection<String> jsonDataList, final OffsetDateTime observedTimestamp) {
126         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
127         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
128         final Collection<Collection<DataNode>> listElementDataNodeCollections =
129                 buildDataNodes(anchor, parentNodeXpath, jsonDataList, ContentType.JSON);
130         cpsDataPersistenceService.addMultipleLists(dataspaceName, anchorName, parentNodeXpath,
131                 listElementDataNodeCollections);
132         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
133     }
134
135     @Override
136     @Timed(value = "cps.data.service.datanode.get",
137             description = "Time taken to get data nodes for an xpath")
138     public Collection<DataNode> getDataNodes(final String dataspaceName, final String anchorName,
139                                              final String xpath,
140                                              final FetchDescendantsOption fetchDescendantsOption) {
141         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
142         return cpsDataPersistenceService.getDataNodes(dataspaceName, anchorName, xpath, fetchDescendantsOption);
143     }
144
145     @Override
146     @Timed(value = "cps.data.service.datanode.batch.get",
147         description = "Time taken to get a batch of data nodes")
148     public Collection<DataNode> getDataNodesForMultipleXpaths(final String dataspaceName, final String anchorName,
149                                                               final Collection<String> xpaths,
150                                                               final FetchDescendantsOption fetchDescendantsOption) {
151         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
152         return cpsDataPersistenceService.getDataNodesForMultipleXpaths(dataspaceName, anchorName, xpaths,
153                 fetchDescendantsOption);
154     }
155
156     @Override
157     @Timed(value = "cps.data.service.datanode.leaves.update",
158         description = "Time taken to get a batch of data nodes")
159     public void updateNodeLeaves(final String dataspaceName, final String anchorName, final String parentNodeXpath,
160         final String jsonData, final OffsetDateTime observedTimestamp) {
161         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
162         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
163         final Collection<DataNode> dataNodesInPatch = buildDataNodes(anchor, parentNodeXpath, jsonData,
164                 ContentType.JSON);
165         if (dataNodesInPatch.size() > 1) {
166             throw new DataValidationException("Operation is not supported for multiple data nodes",
167                     "Number of data nodes present: " + dataNodesInPatch.size());
168         }
169         cpsDataPersistenceService.updateDataLeaves(dataspaceName, anchorName,
170                 dataNodesInPatch.iterator().next().getXpath(),
171             dataNodesInPatch.iterator().next().getLeaves());
172         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
173     }
174
175     @Override
176     @Timed(value = "cps.data.service.datanode.leaves.descendants.leaves.update",
177         description = "Time taken to update data node leaves and existing descendants leaves")
178     public void updateNodeLeavesAndExistingDescendantLeaves(final String dataspaceName, final String anchorName,
179         final String parentNodeXpath,
180         final String dataNodeUpdatesAsJson,
181         final OffsetDateTime observedTimestamp) {
182         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
183         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
184         final Collection<DataNode> dataNodeUpdates =
185             buildDataNodes(anchor, parentNodeXpath, dataNodeUpdatesAsJson, ContentType.JSON);
186         for (final DataNode dataNodeUpdate : dataNodeUpdates) {
187             processDataNodeUpdate(anchor, dataNodeUpdate);
188         }
189         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
190     }
191
192     @Override
193     public String startSession() {
194         return cpsDataPersistenceService.startSession();
195     }
196
197     @Override
198     public void closeSession(final String sessionId) {
199         cpsDataPersistenceService.closeSession(sessionId);
200     }
201
202     @Override
203     public void lockAnchor(final String sessionID, final String dataspaceName, final String anchorName) {
204         lockAnchor(sessionID, dataspaceName, anchorName, DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS);
205     }
206
207     @Override
208     public void lockAnchor(final String sessionID, final String dataspaceName,
209                            final String anchorName, final Long timeoutInMilliseconds) {
210         cpsDataPersistenceService.lockAnchor(sessionID, dataspaceName, anchorName, timeoutInMilliseconds);
211     }
212
213     @Override
214     @Timed(value = "cps.data.service.datanode.descendants.update",
215         description = "Time taken to update a data node and descendants")
216     public void updateDataNodeAndDescendants(final String dataspaceName, final String anchorName,
217                                              final String parentNodeXpath, final String jsonData,
218                                              final OffsetDateTime observedTimestamp) {
219         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
220         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
221         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
222         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
223         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
224     }
225
226     @Override
227     @Timed(value = "cps.data.service.datanode.descendants.batch.update",
228         description = "Time taken to update a batch of data nodes and descendants")
229     public void updateDataNodesAndDescendants(final String dataspaceName, final String anchorName,
230                                               final Map<String, String> nodesJsonData,
231                                               final OffsetDateTime observedTimestamp) {
232         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
233         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
234         final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
235         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
236         nodesJsonData.keySet().forEach(nodeXpath ->
237             processDataUpdatedEventAsync(anchor, nodeXpath, UPDATE, observedTimestamp));
238     }
239
240     @Override
241     @Timed(value = "cps.data.service.list.update",
242         description = "Time taken to update a list")
243     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
244             final String jsonData, final OffsetDateTime observedTimestamp) {
245         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
246         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
247         final Collection<DataNode> newListElements =
248             buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
249         replaceListContent(dataspaceName, anchorName, parentNodeXpath, newListElements, observedTimestamp);
250     }
251
252     @Override
253     @Timed(value = "cps.data.service.list.batch.update",
254         description = "Time taken to update a batch of lists")
255     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
256             final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
257         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
258         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
259         cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
260         processDataUpdatedEventAsync(anchor, parentNodeXpath, UPDATE, observedTimestamp);
261     }
262
263     @Override
264     @Timed(value = "cps.data.service.datanode.delete",
265         description = "Time taken to delete a datanode")
266     public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
267                                final OffsetDateTime observedTimestamp) {
268         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
269         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
270         cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
271         processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp);
272     }
273
274     @Override
275     @Timed(value = "cps.data.service.datanode.batch.delete",
276         description = "Time taken to delete a batch of datanodes")
277     public void deleteDataNodes(final String dataspaceName, final String anchorName,
278                                 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
279         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
280         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
281         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
282         dataNodeXpaths.forEach(dataNodeXpath ->
283             processDataUpdatedEventAsync(anchor, dataNodeXpath, DELETE, observedTimestamp));
284     }
285
286     @Override
287     @Timed(value = "cps.data.service.datanode.delete.anchor",
288         description = "Time taken to delete all datanodes for an anchor")
289     public void deleteDataNodes(final String dataspaceName, final String anchorName,
290                                 final OffsetDateTime observedTimestamp) {
291         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
292         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
293         processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
294         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
295     }
296
297     @Override
298     @Timed(value = "cps.data.service.datanode.delete.anchor.batch",
299         description = "Time taken to delete all datanodes for multiple anchors")
300     public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
301                                 final OffsetDateTime observedTimestamp) {
302         cpsValidator.validateNameCharacters(dataspaceName);
303         cpsValidator.validateNameCharacters(anchorNames);
304         for (final Anchor anchor : cpsAdminService.getAnchors(dataspaceName, anchorNames)) {
305             processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
306         }
307         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
308     }
309
310     @Override
311     @Timed(value = "cps.data.service.list.delete",
312         description = "Time taken to delete a list or list element")
313     public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
314         final OffsetDateTime observedTimestamp) {
315         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
316         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
317         cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
318         processDataUpdatedEventAsync(anchor, listNodeXpath, DELETE, observedTimestamp);
319     }
320
321     private DataNode buildDataNode(final Anchor anchor, final String parentNodeXpath, final String nodeData,
322                                    final ContentType contentType) {
323         final SchemaContext schemaContext = getSchemaContext(anchor);
324
325         if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
326             final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
327             return new DataNodeBuilder().withContainerNode(containerNode).build();
328         }
329
330         final ContainerNode containerNode =
331             timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
332
333         return new DataNodeBuilder()
334                 .withParentNodeXpath(parentNodeXpath)
335                 .withContainerNode(containerNode)
336                 .build();
337     }
338
339     private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
340         return nodesJsonData.entrySet().stream().map(nodeJsonData ->
341             buildDataNode(anchor, nodeJsonData.getKey(),
342                 nodeJsonData.getValue(), ContentType.JSON)).collect(Collectors.toList());
343     }
344
345     private Collection<DataNode> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
346                                                 final String nodeData, final ContentType contentType) {
347         final SchemaContext schemaContext = getSchemaContext(anchor);
348
349         if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
350             final ContainerNode containerNode = timedYangParser.parseData(contentType, nodeData, schemaContext);
351             final Collection<DataNode> dataNodes = new DataNodeBuilder()
352                     .withContainerNode(containerNode)
353                     .buildCollection();
354             if (dataNodes.isEmpty()) {
355                 throw new DataValidationException("Invalid data.", "No data nodes provided");
356             }
357             return dataNodes;
358         }
359         final ContainerNode containerNode =
360             timedYangParser.parseData(contentType, nodeData, schemaContext, parentNodeXpath);
361         final Collection<DataNode> dataNodes = new DataNodeBuilder()
362             .withParentNodeXpath(parentNodeXpath)
363             .withContainerNode(containerNode)
364             .buildCollection();
365         if (dataNodes.isEmpty()) {
366             throw new DataValidationException("Invalid data.", "No data nodes provided");
367         }
368         return dataNodes;
369     }
370
371     private Collection<Collection<DataNode>> buildDataNodes(final Anchor anchor, final String parentNodeXpath,
372                                                             final Collection<String> nodeDataList,
373                                                             final ContentType contentType) {
374         return nodeDataList.stream()
375             .map(nodeData -> buildDataNodes(anchor, parentNodeXpath, nodeData, contentType))
376             .collect(Collectors.toList());
377     }
378
379     private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath,
380                                               final Operation operation, final OffsetDateTime observedTimestamp) {
381         try {
382             notificationService.processDataUpdatedEvent(anchor, xpath, operation, observedTimestamp);
383         } catch (final Exception exception) {
384             //If async message can't be queued for notification service, the initial request should not failed.
385             log.error("Failed to send message to notification service", exception);
386         }
387     }
388
389     private SchemaContext getSchemaContext(final Anchor anchor) {
390         return yangTextSchemaSourceSetCache
391             .get(anchor.getDataspaceName(), anchor.getSchemaSetName()).getSchemaContext();
392     }
393
394     private void processDataNodeUpdate(final Anchor anchor, final DataNode dataNodeUpdate) {
395         if (dataNodeUpdate == null) {
396             return;
397         }
398         cpsDataPersistenceService.updateDataLeaves(anchor.getDataspaceName(), anchor.getName(),
399             dataNodeUpdate.getXpath(), dataNodeUpdate.getLeaves());
400         final Collection<DataNode> childDataNodeUpdates = dataNodeUpdate.getChildDataNodes();
401         for (final DataNode childDataNodeUpdate : childDataNodeUpdates) {
402             processDataNodeUpdate(anchor, childDataNodeUpdate);
403         }
404     }
405
406 }