2 * Copyright © 2016-2017 European Support Limited
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.openecomp.core.zusammen.plugin.dao.impl.cassandra;
19 import com.amdocs.zusammen.datatypes.Id;
20 import com.amdocs.zusammen.datatypes.SessionContext;
21 import com.amdocs.zusammen.datatypes.item.Action;
22 import com.amdocs.zusammen.plugin.statestore.cassandra.dao.types.ElementEntityContext;
23 import com.amdocs.zusammen.utils.fileutils.json.JsonUtil;
24 import com.datastax.driver.core.ResultSet;
25 import com.datastax.driver.core.Row;
26 import com.datastax.driver.mapping.annotations.Accessor;
27 import com.datastax.driver.mapping.annotations.Param;
28 import com.datastax.driver.mapping.annotations.Query;
29 import org.openecomp.core.zusammen.plugin.dao.ElementStageRepository;
30 import org.openecomp.core.zusammen.plugin.dao.types.ElementEntity;
31 import org.openecomp.core.zusammen.plugin.dao.types.StageEntity;
33 import java.nio.ByteBuffer;
34 import java.util.Collection;
35 import java.util.Collections;
36 import java.util.Date;
37 import java.util.HashSet;
38 import java.util.Optional;
40 import java.util.stream.Collectors;
42 public class ElementStageRepositoryImpl implements ElementStageRepository {
45 public Collection<ElementEntity> listIds(SessionContext context,
46 ElementEntityContext elementContext) {
47 return getElements(getStageElementIds(context, elementContext));
51 public Collection<ElementEntity> listConflictedIds(SessionContext context,
52 ElementEntityContext elementContext) {
53 return getElements(getConflictedElementIds(context, elementContext));
57 public void create(SessionContext context, ElementEntityContext elementContext,
58 StageEntity<ElementEntity> elementStage) {
59 createElement(context, elementContext, elementStage);
60 addElementToParent(context, elementContext, elementStage.getEntity());
64 public void markAsNotConflicted(SessionContext context, ElementEntityContext elementContext,
65 ElementEntity element, Action action) {
66 getElementStageAccessor(context).updateState(action, false,
67 elementContext.getSpace(),
68 elementContext.getItemId().toString(),
69 elementContext.getVersionId().toString(),
70 element.getId().toString());
72 getStageElementsAccessor(context).removeConflictElements(
73 Collections.singleton(element.getId().toString()),
74 elementContext.getSpace(),
75 elementContext.getItemId().toString(),
76 elementContext.getVersionId().toString(),
77 elementContext.getRevisionId().getValue());
81 public void markAsNotConflicted(SessionContext context, ElementEntityContext elementContext,
82 ElementEntity element) {
83 getElementStageAccessor(context).markAsNotConflicted(
84 elementContext.getSpace(),
85 elementContext.getItemId().toString(),
86 elementContext.getVersionId().toString(),
87 element.getId().toString());
89 getStageElementsAccessor(context).removeConflictElements(
90 Collections.singleton(element.getId().toString()),
91 elementContext.getSpace(),
92 elementContext.getItemId().toString(),
93 elementContext.getVersionId().toString(),
94 elementContext.getRevisionId().getValue());
98 public void update(SessionContext context, ElementEntityContext elementContext,
99 ElementEntity element, Action action, boolean conflicted) {
100 getElementStageAccessor(context).update(
101 JsonUtil.object2Json(element.getInfo()),
102 JsonUtil.object2Json(element.getRelations()),
104 element.getSearchableData(),
105 element.getVisualization(),
106 element.getElementHash().getValue(),
109 elementContext.getSpace(),
110 elementContext.getItemId().toString(),
111 elementContext.getVersionId().toString(),
112 element.getId().toString());
115 getStageElementsAccessor(context).removeConflictElements(
116 Collections.singleton(element.getId().toString()),
117 elementContext.getSpace(),
118 elementContext.getItemId().toString(),
119 elementContext.getVersionId().toString(),
120 elementContext.getRevisionId().getValue());
125 public void delete(SessionContext context, ElementEntityContext elementContext,
126 ElementEntity element) {
127 removeElementFromParent(context, elementContext, element);
128 deleteElement(context, elementContext, element);
132 public Optional<StageEntity<ElementEntity>> get(SessionContext context,
133 ElementEntityContext elementContext,
134 ElementEntity element) {
135 Row row = getElementStageAccessor(context).get(
136 elementContext.getSpace(),
137 elementContext.getItemId().toString(),
138 elementContext.getVersionId().getValue(),
139 element.getId().toString()).one();
141 return row == null ? Optional.empty() : Optional.of(getStageElement(row));
145 public Optional<StageEntity<ElementEntity>> getDescriptor(SessionContext context,
146 ElementEntityContext elementContext,
147 ElementEntity element) {
148 Row row = getElementStageAccessor(context).getDescriptor(
149 elementContext.getSpace(),
150 elementContext.getItemId().toString(),
151 elementContext.getVersionId().getValue(),
152 element.getId().toString()).one();
154 return row == null ? Optional.empty() : Optional.of(getStageElementDescriptor(row));
159 private Collection<ElementEntity> getElements(Set<String> elementIds) {
160 return elementIds.stream()
161 .map(id -> new ElementEntity(new Id(id)))
162 .collect(Collectors.toList());
165 private void createElement(SessionContext context, ElementEntityContext elementContext,
166 StageEntity<ElementEntity> elementStage) {
169 ElementEntity element = elementStage.getEntity();
170 Set<String> subElementIds =
171 element.getSubElementIds().stream().map(Id::toString).collect(Collectors.toSet());
172 Set<String> conflictDependents = elementStage.getConflictDependents().stream()
173 .map(conflictDependent -> conflictDependent.getId().getValue())
174 .collect(Collectors.toSet());
176 getElementStageAccessor(context).create(
177 elementContext.getSpace(),
178 elementContext.getItemId().toString(),
179 elementContext.getVersionId().getValue(),
180 element.getId().toString(),
181 element.getParentId() == null ? null : element.getParentId().toString(),
182 element.getNamespace() == null ? null : element.getNamespace().toString(),
183 JsonUtil.object2Json(element.getInfo()),
184 JsonUtil.object2Json(element.getRelations()),
186 element.getSearchableData(),
187 element.getVisualization(),
189 element.getElementHash() == null ? null : element.getElementHash().getValue(),
190 elementStage.getPublishTime(),
191 elementStage.getAction(),
192 elementStage.isConflicted(),
195 getStageElementsAccessor(context).add(
196 Collections.singleton(element.getId().toString()),
197 elementContext.getSpace(),
198 elementContext.getItemId().toString(),
199 elementContext.getVersionId().getValue(),
200 elementContext.getRevisionId().getValue());
202 if (elementStage.isConflicted()) {
203 getStageElementsAccessor(context).addConflictElements(
204 Collections.singleton(element.getId().toString()),
205 elementContext.getSpace(),
206 elementContext.getItemId().toString(),
207 elementContext.getVersionId().getValue(),
208 elementContext.getRevisionId().getValue());
212 private void deleteElement(SessionContext context, ElementEntityContext elementContext,
213 ElementEntity element) {
216 getElementStageAccessor(context).delete(
217 elementContext.getSpace(),
218 elementContext.getItemId().toString(),
219 elementContext.getVersionId().getValue(),
220 element.getId().toString());
222 getStageElementsAccessor(context).remove(
223 Collections.singleton(element.getId().toString()),
224 elementContext.getSpace(),
225 elementContext.getItemId().toString(),
226 elementContext.getVersionId().getValue(),
227 elementContext.getRevisionId().getValue());
230 private void addElementToParent(SessionContext context, ElementEntityContext elementContext,
231 ElementEntity element) {
232 if (element.getParentId() == null) {
235 getElementStageAccessor(context).addSubElements(
236 Collections.singleton(element.getId().toString()),
237 elementContext.getSpace(),
238 elementContext.getItemId().toString(),
239 elementContext.getVersionId().getValue(),
240 element.getParentId().toString());
243 private void removeElementFromParent(SessionContext context, ElementEntityContext elementContext,
244 ElementEntity element) {
245 if (element.getParentId() == null) {
248 getElementStageAccessor(context).removeSubElements(
249 Collections.singleton(element.getId().toString()),
250 elementContext.getSpace(),
251 elementContext.getItemId().toString(),
252 elementContext.getVersionId().getValue(),
253 element.getParentId().toString());
256 private StageEntity<ElementEntity> getStageElementDescriptor(Row row) {
257 return buildStageElement(ElementRepositoryImpl.getElementEntityDescriptor(
258 new ElementEntity(new Id(row.getString(ElementStageField.ID))), row), row);
261 private StageEntity<ElementEntity> getStageElement(Row row) {
262 return buildStageElement(ElementRepositoryImpl.getElementEntity(
263 new ElementEntity(new Id(row.getString(ElementStageField.ID))), row), row);
267 private StageEntity<ElementEntity> buildStageElement(ElementEntity element, Row row) {
268 StageEntity<ElementEntity> elementStage =
269 new StageEntity<>(element, row.getDate(ElementStageField.PUBLISH_TIME));
270 elementStage.setAction(Action.valueOf(row.getString(ElementStageField.ACTION)));
271 elementStage.setConflicted(row.getBool(ElementStageField.CONFLICTED));
272 elementStage.setConflictDependents(
273 row.getSet(ElementStageField.CONFLICT_DEPENDENTS, String.class).stream()
274 .map(conflictDependentId -> new ElementEntity(new Id(conflictDependentId)))
275 .collect(Collectors.toSet()));
279 private Set<String> getStageElementIds(SessionContext context,
280 ElementEntityContext elementContext) {
281 Row row = getStageElementsAccessor(context).get(
282 elementContext.getSpace(),
283 elementContext.getItemId().toString(),
284 elementContext.getVersionId().getValue(),
285 elementContext.getRevisionId().getValue()).one();
286 return row == null ? new HashSet<>()
287 : row.getSet(StageElementsField.STAGE_ELEMENT_IDS, String.class);
290 private Set<String> getConflictedElementIds(SessionContext context,
291 ElementEntityContext elementContext) {
292 Row row = getStageElementsAccessor(context).getConflicted(
293 elementContext.getSpace(),
294 elementContext.getItemId().toString(),
295 elementContext.getVersionId().getValue(),
296 elementContext.getRevisionId().getValue()).one();
297 return row == null ? new HashSet<>()
298 : row.getSet(StageElementsField.CONFLICT_ELEMENT_IDS, String.class);
301 private ElementStageAccessor getElementStageAccessor(SessionContext context) {
302 return CassandraDaoUtils.getAccessor(context, ElementStageAccessor.class);
305 private StageElementsAccessor getStageElementsAccessor(SessionContext context) {
306 return CassandraDaoUtils.getAccessor(context, StageElementsAccessor.class);
310 interface ElementStageAccessor {
312 "UPDATE element_stage SET parent_id=:parentId, namespace=:ns, info=:info, relations=:rels, " +
313 "data=:data, searchable_data=:searchableData, visualization=:visualization, " +
314 "publish_time=:publishTime, action=:action, " +
315 "conflicted=:conflicted, conflict_dependent_ids=:conflictDependents, " +
316 "sub_element_ids=sub_element_ids+:subs, element_hash=:elementHash " +
317 "WHERE space=:space AND item_id=:item AND version_id=:ver AND element_id=:id ")
318 void create(@Param("space") String space,
319 @Param("item") String itemId,
320 @Param("ver") String versionId,
321 @Param("id") String elementId,
322 @Param("parentId") String parentElementId,
323 @Param("ns") String namespace,
324 @Param("info") String info,
325 @Param("rels") String relations,
326 @Param("data") ByteBuffer data,
327 @Param("searchableData") ByteBuffer searchableData,
328 @Param("visualization") ByteBuffer visualization,
329 @Param("subs") Set<String> subElementIds,
330 @Param("elementHash") String elementHash,
331 @Param("publishTime") Date publishTime,
332 @Param("action") Action action,
333 @Param("conflicted") boolean conflicted,
334 @Param("conflictDependents") Set<String> conflictDependents);
336 @Query("UPDATE element_stage SET info=?, relations=?, data=?, searchable_data=?, " +
337 "visualization=?,element_hash=?, action=?, conflicted=? " +
338 " WHERE space=? AND item_id=? AND version_id=? AND element_id=? ")
339 void update(String info, String relations, ByteBuffer data, ByteBuffer searchableData,
340 ByteBuffer visualization, String elementHash, Action action, boolean conflicted,
342 String itemId, String versionId, String elementId);
344 @Query("UPDATE element_stage SET action=?, conflicted=? " +
345 " WHERE space=? AND item_id=? AND version_id=? AND element_id=? ")
346 void updateState(Action action, boolean conflicted, String space, String itemId,
347 String versionId, String elementId);
349 @Query("UPDATE element_stage SET conflicted=false " +
350 " WHERE space=? AND item_id=? AND version_id=? AND element_id=? ")
351 void markAsNotConflicted(String space, String itemId, String versionId, String elementId);
354 "DELETE FROM element_stage WHERE space=? AND item_id=? AND version_id=? AND element_id=?")
355 void delete(String space, String itemId, String versionId, String elementId);
357 @Query("SELECT element_id, parent_id, namespace, info, relations, data, searchable_data, " +
358 "visualization, sub_element_ids,element_hash, publish_time, action, " +
359 "conflicted, conflict_dependent_ids FROM element_stage " +
360 "WHERE space=? AND item_id=? AND version_id=? AND element_id=? ")
361 ResultSet get(String space, String itemId, String versionId, String elementId);
363 @Query("SELECT element_id, parent_id, namespace, info, relations, " +
364 "sub_element_ids, publish_time, action, conflicted, conflict_dependent_ids " +
365 "FROM element_stage WHERE space=? AND item_id=? AND version_id=? AND element_id=? ")
366 ResultSet getDescriptor(String space, String itemId, String versionId, String elementId);
368 @Query("UPDATE element_stage SET sub_element_ids=sub_element_ids+? " +
369 " WHERE space=? AND item_id=? AND version_id=? AND element_id=? ")
370 void addSubElements(Set<String> subElementIds, String space, String itemId, String versionId,
373 @Query("UPDATE element_stage SET sub_element_ids=sub_element_ids-? " +
374 " WHERE space=? AND item_id=? AND version_id=? AND element_id=? ")
375 void removeSubElements(Set<String> subElementIds, String space, String itemId, String versionId,
379 private static final class ElementStageField {
380 private static final String ID = "element_id";
381 private static final String PUBLISH_TIME = "publish_time";
382 private static final String ACTION = "action";
383 private static final String CONFLICTED = "conflicted";
384 private static final String CONFLICT_DEPENDENTS = "conflict_dependent_ids";
388 interface StageElementsAccessor {
390 @Query("UPDATE version_elements SET stage_element_ids=stage_element_ids+? " +
391 "WHERE space=? AND item_id=? AND version_id=? AND revision_id=? ")
392 void add(Set<String> elementIds, String space, String itemId, String versionId, String
395 @Query("UPDATE version_elements SET stage_element_ids=stage_element_ids-? " +
396 "WHERE space=? AND item_id=? AND version_id=? AND revision_id=? ")
397 void remove(Set<String> elementIds, String space, String itemId, String versionId, String
400 @Query("SELECT stage_element_ids FROM version_elements " +
401 "WHERE space=? AND item_id=? AND version_id=? AND revision_id=?")
402 ResultSet get(String space, String itemId, String versionId, String revisionId);
404 @Query("UPDATE version_elements SET conflict_element_ids=conflict_element_ids+? " +
405 "WHERE space=? AND item_id=? AND version_id=? AND revision_id=? ")
406 void addConflictElements(Set<String> elementIds, String space, String itemId, String
407 versionId, String revisionId);
409 @Query("UPDATE version_elements SET conflict_element_ids=conflict_element_ids-? " +
410 "WHERE space=? AND item_id=? AND version_id=? AND revision_id=? ")
411 void removeConflictElements(Set<String> elementIds, String space, String itemId,
412 String versionId, String revisionId);
414 @Query("SELECT conflict_element_ids FROM version_elements " +
415 "WHERE space=? AND item_id=? AND version_id=? AND revision_id=? ")
416 ResultSet getConflicted(String space, String itemId, String versionId, String revisionId);
419 private static final class StageElementsField {
420 private static final String STAGE_ELEMENT_IDS = "stage_element_ids";
421 private static final String CONFLICT_ELEMENT_IDS = "conflict_element_ids";