Add collaboration feature
[sdc.git] / openecomp-be / lib / openecomp-core-lib / openecomp-zusammen-lib / openecomp-zusammen-plugin / src / main / java / org / openecomp / core / zusammen / plugin / main / CassandraCollaborationStorePluginImpl.java
1 /*
2  * Copyright © 2016-2017 European Support Limited
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.openecomp.core.zusammen.plugin.main;
18
19 import com.amdocs.zusammen.commons.health.data.HealthInfo;
20 import com.amdocs.zusammen.commons.health.data.HealthStatus;
21 import com.amdocs.zusammen.datatypes.Id;
22 import com.amdocs.zusammen.datatypes.Namespace;
23 import com.amdocs.zusammen.datatypes.SessionContext;
24 import com.amdocs.zusammen.datatypes.Space;
25 import com.amdocs.zusammen.datatypes.item.Action;
26 import com.amdocs.zusammen.datatypes.item.ElementContext;
27 import com.amdocs.zusammen.datatypes.item.Info;
28 import com.amdocs.zusammen.datatypes.item.ItemVersion;
29 import com.amdocs.zusammen.datatypes.item.ItemVersionData;
30 import com.amdocs.zusammen.datatypes.item.ItemVersionDataConflict;
31 import com.amdocs.zusammen.datatypes.item.ItemVersionStatus;
32 import com.amdocs.zusammen.datatypes.item.Resolution;
33 import com.amdocs.zusammen.datatypes.itemversion.ItemVersionRevisions;
34 import com.amdocs.zusammen.datatypes.itemversion.Revision;
35 import com.amdocs.zusammen.datatypes.itemversion.Tag;
36 import com.amdocs.zusammen.datatypes.response.ErrorCode;
37 import com.amdocs.zusammen.datatypes.response.Module;
38 import com.amdocs.zusammen.datatypes.response.Response;
39 import com.amdocs.zusammen.datatypes.response.ReturnCode;
40 import com.amdocs.zusammen.datatypes.response.ZusammenException;
41 import com.amdocs.zusammen.sdk.collaboration.CollaborationStore;
42 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationElement;
43 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationElementChange;
44 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationElementConflict;
45 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationItemVersionConflict;
46 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeChange;
47 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeResult;
48 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationPublishResult;
49 import com.amdocs.zusammen.sdk.types.ElementConflictDescriptor;
50 import com.amdocs.zusammen.sdk.types.ElementDescriptor;
51 import org.openecomp.core.zusammen.plugin.ZusammenPluginUtil;
52 import org.openecomp.core.zusammen.plugin.collaboration.CommitStagingService;
53 import org.openecomp.core.zusammen.plugin.collaboration.ElementPrivateStore;
54 import org.openecomp.core.zusammen.plugin.collaboration.ElementPublicStore;
55 import org.openecomp.core.zusammen.plugin.collaboration.ElementStageStore;
56 import org.openecomp.core.zusammen.plugin.collaboration.PublishService;
57 import org.openecomp.core.zusammen.plugin.collaboration.RevertService;
58 import org.openecomp.core.zusammen.plugin.collaboration.SyncService;
59 import org.openecomp.core.zusammen.plugin.collaboration.VersionPrivateStore;
60 import org.openecomp.core.zusammen.plugin.collaboration.VersionPublicStore;
61 import org.openecomp.core.zusammen.plugin.collaboration.VersionStageStore;
62 import org.openecomp.core.zusammen.plugin.collaboration.impl.ElementPrivateStoreImpl;
63 import org.openecomp.core.zusammen.plugin.collaboration.impl.ElementPublicStoreImpl;
64 import org.openecomp.core.zusammen.plugin.collaboration.impl.ElementStageStoreImpl;
65 import org.openecomp.core.zusammen.plugin.collaboration.impl.VersionPrivateStoreImpl;
66 import org.openecomp.core.zusammen.plugin.collaboration.impl.VersionPublicStoreImpl;
67 import org.openecomp.core.zusammen.plugin.collaboration.impl.VersionStageStoreImpl;
68 import org.openecomp.core.zusammen.plugin.dao.types.ElementEntity;
69 import org.openecomp.core.zusammen.plugin.dao.types.StageEntity;
70 import org.openecomp.core.zusammen.plugin.dao.types.SynchronizationStateEntity;
71 import org.openecomp.core.zusammen.plugin.dao.types.VersionDataElement;
72 import org.openecomp.core.zusammen.plugin.dao.types.VersionEntity;
73
74 import java.util.Collection;
75 import java.util.Date;
76 import java.util.List;
77 import java.util.Optional;
78 import java.util.stream.Collectors;
79
80 import static com.amdocs.zusammen.datatypes.item.SynchronizationStatus.MERGING;
81 import static com.amdocs.zusammen.datatypes.item.SynchronizationStatus.OUT_OF_SYNC;
82 import static com.amdocs.zusammen.datatypes.item.SynchronizationStatus.UP_TO_DATE;
83 import static org.openecomp.core.zusammen.plugin.ZusammenPluginConstants.ROOT_ELEMENTS_PARENT_ID;
84 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToCollaborationElement;
85 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToElementChange;
86 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToElementDescriptor;
87 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToElementEntity;
88 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToItemVersion;
89 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToVersionData;
90 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToVersionEntity;
91
92 public class CassandraCollaborationStorePluginImpl implements CollaborationStore {
93   // TODO: 8/15/2017 inject
94
95   private VersionPrivateStore versionPrivateStore = new VersionPrivateStoreImpl();
96   private VersionPublicStore versionPublicStore = new VersionPublicStoreImpl();
97   private VersionStageStore versionStageStore = new VersionStageStoreImpl();
98
99   private ElementPrivateStore elementPrivateStore = new ElementPrivateStoreImpl();
100   private ElementPublicStore elementPublicStore = new ElementPublicStoreImpl();
101   private ElementStageStore elementStageStore = new ElementStageStoreImpl();
102
103   // TODO: 9/4/2017
104   private CommitStagingService commitStagingService =
105       new CommitStagingService(versionPrivateStore, versionStageStore, elementPrivateStore,
106           elementStageStore);
107   private PublishService publishService =
108       new PublishService(versionPublicStore, versionPrivateStore, elementPublicStore,
109           elementPrivateStore);
110   private SyncService syncService =
111       new SyncService(versionPublicStore, versionPrivateStore, versionStageStore,
112           elementPublicStore, elementPrivateStore, elementStageStore);
113
114   private RevertService revertService =
115       new RevertService(elementPublicStore, elementPrivateStore);
116
117   @Override
118   public Response<Void> createItem(SessionContext context, Id itemId, Info info) {
119     // done by state store
120     return new Response(Void.TYPE);
121   }
122
123   @Override
124   public Response<Void> deleteItem(SessionContext context, Id itemId) {
125     // done by state store
126     return new Response(Void.TYPE);
127   }
128
129   @Override
130   public Response<Void> createItemVersion(SessionContext context, Id itemId, Id baseVersionId,
131                                           Id versionId, ItemVersionData itemVersionData) {
132     Date creationTime = new Date();
133     versionPrivateStore.create(context, itemId,
134         convertToVersionEntity(versionId, baseVersionId, creationTime, creationTime));
135
136     ElementContext elementContext = new ElementContext(itemId, versionId);
137     VersionDataElement versionData = new VersionDataElement(itemVersionData);
138
139     if (baseVersionId == null) {
140       elementPrivateStore.create(context, elementContext, versionData);
141     } else {
142       copyElements(context, new ElementContext(itemId, baseVersionId), elementContext);
143       elementPrivateStore.update(context, elementContext, versionData);
144     }
145
146     return new Response(Void.TYPE);
147   }
148
149   @Override
150   public Response<Void> updateItemVersion(SessionContext context, Id itemId, Id versionId,
151                                           ItemVersionData itemVersionData) {
152
153     if (elementPrivateStore.update(context, new ElementContext(itemId, versionId),
154         new VersionDataElement(itemVersionData))) {
155
156       VersionEntity version = new VersionEntity(versionId);
157       version.setModificationTime(new Date());
158       versionPrivateStore.update(context, itemId, version);
159     }
160
161     return new Response(Void.TYPE);
162   }
163
164   @Override
165   public Response<Void> deleteItemVersion(SessionContext context, Id itemId, Id versionId) {
166     elementPrivateStore
167         .delete(context, new ElementContext(itemId, versionId), new VersionDataElement());
168
169     versionPrivateStore.delete(context, itemId, new VersionEntity(versionId));
170     return new Response(Void.TYPE);
171   }
172
173   @Override
174   public Response<ItemVersionStatus> getItemVersionStatus(SessionContext context, Id itemId,
175                                                           Id versionId) {
176     if (versionStageStore.get(context, itemId, new VersionEntity(versionId)).isPresent()) {
177       return new Response<>(new ItemVersionStatus(MERGING, true));
178     }
179
180     Optional<SynchronizationStateEntity> publicSyncState =
181         versionPublicStore.getSynchronizationState(context, itemId, versionId);
182
183     if (!publicSyncState.isPresent()) {
184       return new Response<>(new ItemVersionStatus(UP_TO_DATE, true));
185     }
186
187     SynchronizationStateEntity privateSyncState =
188         versionPrivateStore.getSynchronizationState(context, itemId, versionId)
189             // TODO: 7/18/2017 ?
190             .orElseThrow(() -> new IllegalStateException("private version must exist"));
191
192     return new Response<>(new ItemVersionStatus(
193         privateSyncState.getPublishTime().equals(publicSyncState.get().getPublishTime())
194             ? UP_TO_DATE
195             : OUT_OF_SYNC,
196         privateSyncState.isDirty()));
197   }
198
199   @Override
200   public Response<Void> tagItemVersion(SessionContext context, Id itemId, Id versionId,
201                                        Id revisionId,
202                                        Tag tag) {
203    /* if (revisionId != null) {
204       throw new UnsupportedOperationException(
205           "In this plugin implementation tag is supported only on versionId");
206     }
207
208     copyElements(context,
209         new ElementContext(itemId, versionId),
210         new ElementContext(itemId, versionId, tag.getName()));*/
211
212     return new Response(Void.TYPE);
213   }
214
215   @Override
216   public Response<CollaborationPublishResult> publishItemVersion(SessionContext context,
217                                                                  Id itemId, Id versionId,
218                                                                  String message) {
219     try {
220       return new Response<>(publishService.publish(context, itemId, versionId, message));
221     } catch (ZusammenException ze) {
222       return new Response<>(
223           new ReturnCode(ErrorCode.CL_ITEM_VERSION_PUBLISH, Module.ZCSP, null, ze.getReturnCode()));
224     }
225   }
226
227   @Override
228   public Response<CollaborationMergeResult> syncItemVersion(SessionContext context, Id itemId,
229                                                             Id versionId) {
230     CollaborationMergeResult result = syncService.sync(context, itemId, versionId, false);
231     commitStagingService.commitStaging(context, itemId, versionId);
232
233     return new Response<>(result);
234   }
235
236   @Override
237   public Response<CollaborationMergeResult> forceSyncItemVersion(SessionContext context, Id itemId,
238                                                                  Id versionId) {
239     CollaborationMergeResult result = syncService.sync(context, itemId, versionId, true);
240     commitStagingService.commitStaging(context, itemId, versionId);
241
242     return new Response<>(result);
243   }
244
245   @Override
246   public Response<CollaborationMergeResult> mergeItemVersion(SessionContext context, Id itemId,
247                                                              Id versionId, Id sourceVersionId) {
248     throw new UnsupportedOperationException("mergeItemVersion");
249   }
250
251   @Override
252   public Response<CollaborationItemVersionConflict> getItemVersionConflict(SessionContext context,
253                                                                            Id itemId,
254                                                                            Id versionId) {
255     ElementContext elementContext = new ElementContext(itemId, versionId, Id.ZERO);
256
257     Collection<StageEntity<ElementEntity>> conflictedStagedElementDescriptors =
258         elementStageStore.listConflictedDescriptors(context, elementContext);
259
260     CollaborationItemVersionConflict result = new CollaborationItemVersionConflict();
261     for (StageEntity<ElementEntity> stagedElementDescriptor : conflictedStagedElementDescriptors) {
262       if (ROOT_ELEMENTS_PARENT_ID.equals(stagedElementDescriptor.getEntity().getId())) {
263         result.setVersionDataConflict(
264             getVersionDataConflict(context, elementContext, stagedElementDescriptor));
265       } else {
266         result.getElementConflictDescriptors()
267             .add(getElementConflictDescriptor(context, elementContext, stagedElementDescriptor));
268       }
269     }
270     return new Response<>(result);
271   }
272
273   @Override
274   public Response<ItemVersionRevisions> listItemVersionRevisions(SessionContext context, Id itemId,
275                                                                  Id versionId) {
276     return new Response<>(versionPublicStore.listItemVersionRevisions(context, itemId, versionId));
277   }
278
279   @Override
280   public Response<Revision> getItemVersionRevision(SessionContext context, Id itemId, Id versionId,
281                                                    Id revisionId) {
282     throw new UnsupportedOperationException(
283         "get revision is not supported in the current cassandra plugin");
284   }
285
286   @Override
287   public Response<CollaborationMergeChange> resetItemVersionRevision(SessionContext context,
288                                                                      Id itemId, Id versionId,
289                                                                      Id revisionId) {
290     throw new UnsupportedOperationException("resetItemVersionRevision function not supported");
291
292   }
293
294   @Override
295   public Response<CollaborationMergeChange> revertItemVersionRevision(SessionContext context,
296                                                                       Id itemId, Id versionId,
297                                                                       Id revisionId) {
298     Optional<ItemVersion> itemVersion = getItemVersion(context, itemId, versionId, revisionId);
299     if (!itemVersion.isPresent()) {
300       throw new RuntimeException(String
301           .format("Item %s, version %s: Cannot revert to revision %s since it is not found",
302               itemId, versionId, revisionId));
303     }
304
305     // TODO: 12/4/2017 force sync is done in order to clear dirty element on private
306     // this is temp solution that should be fixed.
307     forceSyncItemVersion(context, itemId, versionId);
308
309     //updateItemVersion(context, itemId, versionId, itemVersion.get().getData());
310     revertService.revert(context, itemId, versionId, revisionId);
311
312     return new Response<>(new CollaborationMergeChange());
313   }
314
315
316   @Override
317   public Response<Void> commitElements(SessionContext context, Id itemId, Id versionId, String s) {
318     // not needed
319     return new Response(Void.TYPE);
320   }
321
322   @Override
323   public Response<Collection<CollaborationElement>> listElements(SessionContext context,
324                                                                  ElementContext elementContext,
325                                                                  Namespace namespace,
326                                                                  Id elementId) {
327     return new Response<>(elementPrivateStore.listSubs(context, elementContext, elementId).stream()
328         .map(elementEntity -> convertToCollaborationElement(elementContext, elementEntity))
329         .collect(Collectors.toList()));
330   }
331
332   @Override
333   public Response<CollaborationElement> getElement(SessionContext context,
334                                                    ElementContext elementContext,
335                                                    Namespace namespace, Id elementId) {
336     return new Response<>(elementPrivateStore.get(context, elementContext, elementId)
337         .map(elementEntity -> convertToCollaborationElement(elementContext, elementEntity))
338         .orElse(null));
339   }
340
341   @Override
342   public Response<CollaborationElementConflict> getElementConflict(SessionContext context,
343                                                                    ElementContext elementContext,
344                                                                    Namespace namespace,
345                                                                    Id elementId) {
346     Optional<StageEntity<ElementEntity>> conflictedStagedElement =
347         elementStageStore
348             .getConflicted(context, elementContext, new ElementEntity(elementId));
349
350     return new Response<>(conflictedStagedElement
351         .map(stagedElement -> getElementConflict(context, elementContext, stagedElement))
352         .orElse(null));
353   }
354
355   @Override
356   public Response<Void> createElement(SessionContext context, CollaborationElement element) {
357     elementPrivateStore.create(context,
358         new ElementContext(element.getItemId(), element.getVersionId()),
359         convertToElementEntity(element));
360     return new Response(Void.TYPE);
361   }
362
363   @Override
364   public Response<Void> updateElement(SessionContext context, CollaborationElement element) {
365     elementPrivateStore.update(context,
366         new ElementContext(element.getItemId(), element.getVersionId()),
367         convertToElementEntity(element));
368     return new Response(Void.TYPE);
369   }
370
371   @Override
372   public Response<Void> deleteElement(SessionContext context, CollaborationElement element) {
373     elementPrivateStore
374         .delete(context, new ElementContext(element.getItemId(), element.getVersionId()),
375             convertToElementEntity(element));
376
377     return new Response(Void.TYPE);
378   }
379
380   @Override
381   public Response<CollaborationMergeResult> resolveElementConflict(SessionContext context,
382                                                                    CollaborationElement element,
383                                                                    Resolution resolution) {
384     ElementContext elementContext = new ElementContext(element.getItemId(), element.getVersionId());
385     elementStageStore
386         .resolveConflict(context, elementContext, convertToElementEntity(element), resolution);
387     commitStagingService.commitStaging(context, element.getItemId(), element.getVersionId());
388
389     return new Response<>(new CollaborationMergeResult());
390   }
391
392   @Override
393   public Response<ItemVersion> getItemVersion(SessionContext context, Space space, Id itemId,
394                                               Id versionId, Id revisionId) {
395     return new Response<>(getItemVersion(context, itemId, versionId, revisionId).orElse(null));
396   }
397
398   @Override
399   public Response<HealthInfo> checkHealth(SessionContext context) throws ZusammenException {
400     HealthInfo healthInfo = versionPublicStore.checkHealth(context)
401         ? new HealthInfo(Module.ZCSP.getDescription(), HealthStatus.UP, "")
402         : new HealthInfo(Module.ZCSP.getDescription(), HealthStatus.DOWN, "No Schema Available");
403
404     return new Response<>(healthInfo);
405   }
406
407   private Optional<ItemVersion> getItemVersion(SessionContext context, Id itemId, Id versionId,
408                                                Id revisionId) {
409     // since revisions are kept only on public - get from there
410     Optional<VersionEntity> versionEntity = versionPublicStore.get(context, itemId, versionId);
411     if (!versionEntity.isPresent()) {
412       return Optional.empty();
413     }
414
415     return elementPublicStore
416         .getDescriptor(context, new ElementContext(itemId, versionId, revisionId),
417             ROOT_ELEMENTS_PARENT_ID)
418         .map(ZusammenPluginUtil::convertToVersionData)
419         .map(itemVersionData -> convertToItemVersion(versionEntity.get(), itemVersionData));
420   }
421
422   private List<ElementEntity> listVersionElements(SessionContext context,
423                                                   ElementContext elementContext) {
424     return elementPrivateStore.listIds(context, elementContext).entrySet().stream() // TODO:
425         // 9/5/2017 parallel
426         .map(entry -> elementPrivateStore.get(context, elementContext, entry.getKey()).get())
427         .collect(Collectors.toList());
428   }
429
430   private void copyElements(SessionContext context,
431                             ElementContext sourceContext, ElementContext targetContext) {
432     listVersionElements(context, sourceContext).forEach(element -> {
433       // publishTime copied as is and dirty is off
434       Date publishTime =
435           elementPrivateStore.getSynchronizationState(context, sourceContext, element.getId())
436               .get().getPublishTime();
437       elementPrivateStore.commitStagedCreate(context, targetContext, element, publishTime);
438     });
439   }
440
441   private ItemVersionDataConflict getVersionDataConflict(SessionContext context,
442                                                          ElementContext elementContext,
443                                                          StageEntity<ElementEntity> stagedElementDescriptor) {
444     ItemVersionDataConflict versionConflict = new ItemVersionDataConflict();
445     versionConflict.setRemoteData(convertToVersionData(stagedElementDescriptor.getEntity()));
446     if (stagedElementDescriptor.getAction() == Action.UPDATE) {
447       versionConflict.setLocalData(getPrivateVersionData(context, elementContext));
448     }
449     return versionConflict;
450   }
451
452   private ItemVersionData getPrivateVersionData(SessionContext context,
453                                                 ElementContext elementContext) {
454     return elementPrivateStore.getDescriptor(context, elementContext, ROOT_ELEMENTS_PARENT_ID)
455         .map(ZusammenPluginUtil::convertToVersionData)
456         .orElseThrow(() -> new IllegalStateException("Version must have data"));
457   }
458
459   private ElementConflictDescriptor getElementConflictDescriptor(SessionContext context,
460                                                                  ElementContext elementContext,
461                                                                  StageEntity<ElementEntity> stagedElementDescriptor) {
462     ElementDescriptor elementDescriptorFromStage =
463         convertToElementDescriptor(elementContext, (stagedElementDescriptor.getEntity()));
464
465     ElementConflictDescriptor conflictDescriptor = new ElementConflictDescriptor();
466     switch (stagedElementDescriptor.getAction()) {
467       case CREATE:
468         conflictDescriptor.setRemoteElementDescriptor(elementDescriptorFromStage);
469         break;
470       case UPDATE:
471         conflictDescriptor.setRemoteElementDescriptor(elementDescriptorFromStage);
472         conflictDescriptor.setLocalElementDescriptor(convertToElementDescriptor(elementContext,
473             elementPrivateStore
474                 .getDescriptor(context, elementContext, stagedElementDescriptor.getEntity().getId())
475                 .orElse(null)));// updated on public while deleted from private
476         break;
477       case DELETE:
478         conflictDescriptor.setLocalElementDescriptor(elementDescriptorFromStage);
479         break;
480       default:
481         break;
482     }
483     return conflictDescriptor;
484   }
485
486   private void addElementsToChangedElements(ElementContext elementContext,
487                                             Collection<ElementEntity> elements,
488                                             Collection<CollaborationElementChange> changedElements,
489                                             Action action) {
490     elements.stream()
491         .map(elementEntity -> convertToElementChange(elementContext, elementEntity, action))
492         .forEach(changedElements::add);
493   }
494
495   private CollaborationElementConflict getElementConflict(SessionContext context,
496                                                           ElementContext entityContext,
497                                                           StageEntity<ElementEntity> stagedElement) {
498     CollaborationElement elementFromStage =
499         convertToCollaborationElement(entityContext, (stagedElement.getEntity()));
500
501     CollaborationElementConflict conflict = new CollaborationElementConflict();
502     switch (stagedElement.getAction()) {
503       case CREATE:
504         conflict.setRemoteElement(elementFromStage);
505         break;
506       case UPDATE:
507         conflict.setRemoteElement(elementFromStage);
508         conflict.setLocalElement(
509             elementPrivateStore.get(context, entityContext, stagedElement.getEntity().getId())
510                 .map(element -> convertToCollaborationElement(entityContext, element))
511                 .orElse(null));// updated on public while deleted from private
512         break;
513       case DELETE:
514         conflict.setLocalElement(elementFromStage);
515         break;
516       default:
517         break;
518     }
519     return conflict;
520   }
521 }