2  * Copyright © 2016-2017 European Support Limited
 
   4  * Licensed under the Apache License, Version 2.0 (the "License");
 
   5  * you may not use this file except in compliance with the License.
 
   6  * You may obtain a copy of the License at
 
   8  *     http://www.apache.org/licenses/LICENSE-2.0
 
  10  * Unless required by applicable law or agreed to in writing, software
 
  11  * distributed under the License is distributed on an "AS IS" BASIS,
 
  12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  13  * See the License for the specific language governing permissions and
 
  14  * limitations under the License.
 
  17 package org.openecomp.core.zusammen.plugin.main;
 
  19 import com.amdocs.zusammen.commons.health.data.HealthInfo;
 
  20 import com.amdocs.zusammen.commons.health.data.HealthStatus;
 
  21 import com.amdocs.zusammen.datatypes.Id;
 
  22 import com.amdocs.zusammen.datatypes.Namespace;
 
  23 import com.amdocs.zusammen.datatypes.SessionContext;
 
  24 import com.amdocs.zusammen.datatypes.Space;
 
  25 import com.amdocs.zusammen.datatypes.item.Action;
 
  26 import com.amdocs.zusammen.datatypes.item.ElementContext;
 
  27 import com.amdocs.zusammen.datatypes.item.Info;
 
  28 import com.amdocs.zusammen.datatypes.item.ItemVersion;
 
  29 import com.amdocs.zusammen.datatypes.item.ItemVersionData;
 
  30 import com.amdocs.zusammen.datatypes.item.ItemVersionDataConflict;
 
  31 import com.amdocs.zusammen.datatypes.item.ItemVersionStatus;
 
  32 import com.amdocs.zusammen.datatypes.item.Resolution;
 
  33 import com.amdocs.zusammen.datatypes.itemversion.ItemVersionRevisions;
 
  34 import com.amdocs.zusammen.datatypes.itemversion.Revision;
 
  35 import com.amdocs.zusammen.datatypes.itemversion.Tag;
 
  36 import com.amdocs.zusammen.datatypes.response.ErrorCode;
 
  37 import com.amdocs.zusammen.datatypes.response.Module;
 
  38 import com.amdocs.zusammen.datatypes.response.Response;
 
  39 import com.amdocs.zusammen.datatypes.response.ReturnCode;
 
  40 import com.amdocs.zusammen.datatypes.response.ZusammenException;
 
  41 import com.amdocs.zusammen.sdk.collaboration.CollaborationStore;
 
  42 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationElement;
 
  43 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationElementChange;
 
  44 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationElementConflict;
 
  45 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationItemVersionConflict;
 
  46 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeChange;
 
  47 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeResult;
 
  48 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationPublishResult;
 
  49 import com.amdocs.zusammen.sdk.types.ElementConflictDescriptor;
 
  50 import com.amdocs.zusammen.sdk.types.ElementDescriptor;
 
  51 import org.openecomp.core.zusammen.plugin.ZusammenPluginUtil;
 
  52 import org.openecomp.core.zusammen.plugin.collaboration.CommitStagingService;
 
  53 import org.openecomp.core.zusammen.plugin.collaboration.ElementPrivateStore;
 
  54 import org.openecomp.core.zusammen.plugin.collaboration.ElementPublicStore;
 
  55 import org.openecomp.core.zusammen.plugin.collaboration.ElementStageStore;
 
  56 import org.openecomp.core.zusammen.plugin.collaboration.PublishService;
 
  57 import org.openecomp.core.zusammen.plugin.collaboration.RevertService;
 
  58 import org.openecomp.core.zusammen.plugin.collaboration.SyncService;
 
  59 import org.openecomp.core.zusammen.plugin.collaboration.VersionPrivateStore;
 
  60 import org.openecomp.core.zusammen.plugin.collaboration.VersionPublicStore;
 
  61 import org.openecomp.core.zusammen.plugin.collaboration.VersionStageStore;
 
  62 import org.openecomp.core.zusammen.plugin.collaboration.impl.ElementPrivateStoreImpl;
 
  63 import org.openecomp.core.zusammen.plugin.collaboration.impl.ElementPublicStoreImpl;
 
  64 import org.openecomp.core.zusammen.plugin.collaboration.impl.ElementStageStoreImpl;
 
  65 import org.openecomp.core.zusammen.plugin.collaboration.impl.VersionPrivateStoreImpl;
 
  66 import org.openecomp.core.zusammen.plugin.collaboration.impl.VersionPublicStoreImpl;
 
  67 import org.openecomp.core.zusammen.plugin.collaboration.impl.VersionStageStoreImpl;
 
  68 import org.openecomp.core.zusammen.plugin.dao.types.ElementEntity;
 
  69 import org.openecomp.core.zusammen.plugin.dao.types.StageEntity;
 
  70 import org.openecomp.core.zusammen.plugin.dao.types.SynchronizationStateEntity;
 
  71 import org.openecomp.core.zusammen.plugin.dao.types.VersionDataElement;
 
  72 import org.openecomp.core.zusammen.plugin.dao.types.VersionEntity;
 
  74 import java.util.Collection;
 
  75 import java.util.Date;
 
  76 import java.util.List;
 
  77 import java.util.Optional;
 
  78 import java.util.stream.Collectors;
 
  80 import static com.amdocs.zusammen.datatypes.item.SynchronizationStatus.MERGING;
 
  81 import static com.amdocs.zusammen.datatypes.item.SynchronizationStatus.OUT_OF_SYNC;
 
  82 import static com.amdocs.zusammen.datatypes.item.SynchronizationStatus.UP_TO_DATE;
 
  83 import static org.openecomp.core.zusammen.plugin.ZusammenPluginConstants.ROOT_ELEMENTS_PARENT_ID;
 
  84 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToCollaborationElement;
 
  85 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToElementChange;
 
  86 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToElementDescriptor;
 
  87 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToElementEntity;
 
  88 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToItemVersion;
 
  89 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToVersionData;
 
  90 import static org.openecomp.core.zusammen.plugin.ZusammenPluginUtil.convertToVersionEntity;
 
  92 public class CassandraCollaborationStorePluginImpl implements CollaborationStore {
 
  93   // TODO: 8/15/2017 inject
 
  95   private VersionPrivateStore versionPrivateStore = new VersionPrivateStoreImpl();
 
  96   private VersionPublicStore versionPublicStore = new VersionPublicStoreImpl();
 
  97   private VersionStageStore versionStageStore = new VersionStageStoreImpl();
 
  99   private ElementPrivateStore elementPrivateStore = new ElementPrivateStoreImpl();
 
 100   private ElementPublicStore elementPublicStore = new ElementPublicStoreImpl();
 
 101   private ElementStageStore elementStageStore = new ElementStageStoreImpl();
 
 104   private CommitStagingService commitStagingService =
 
 105       new CommitStagingService(versionPrivateStore, versionStageStore, elementPrivateStore,
 
 107   private PublishService publishService =
 
 108       new PublishService(versionPublicStore, versionPrivateStore, elementPublicStore,
 
 109           elementPrivateStore);
 
 110   private SyncService syncService =
 
 111       new SyncService(versionPublicStore, versionPrivateStore, versionStageStore,
 
 112           elementPublicStore, elementPrivateStore, elementStageStore);
 
 114   private RevertService revertService =
 
 115       new RevertService(elementPublicStore, elementPrivateStore);
 
 118   public Response<Void> createItem(SessionContext context, Id itemId, Info info) {
 
 119     // done by state store
 
 120     return new Response(Void.TYPE);
 
 124   public Response<Void> deleteItem(SessionContext context, Id itemId) {
 
 125     // done by state store
 
 126     return new Response(Void.TYPE);
 
 130   public Response<Void> createItemVersion(SessionContext context, Id itemId, Id baseVersionId,
 
 131                                           Id versionId, ItemVersionData itemVersionData) {
 
 132     Date creationTime = new Date();
 
 133     versionPrivateStore.create(context, itemId,
 
 134         convertToVersionEntity(versionId, baseVersionId, creationTime, creationTime));
 
 136     ElementContext elementContext = new ElementContext(itemId, versionId);
 
 137     VersionDataElement versionData = new VersionDataElement(itemVersionData);
 
 139     if (baseVersionId == null) {
 
 140       elementPrivateStore.create(context, elementContext, versionData);
 
 142       copyElements(context, new ElementContext(itemId, baseVersionId), elementContext);
 
 143       elementPrivateStore.update(context, elementContext, versionData);
 
 146     return new Response(Void.TYPE);
 
 150   public Response<Void> updateItemVersion(SessionContext context, Id itemId, Id versionId,
 
 151                                           ItemVersionData itemVersionData) {
 
 153     if (elementPrivateStore.update(context, new ElementContext(itemId, versionId),
 
 154         new VersionDataElement(itemVersionData))) {
 
 156       VersionEntity version = new VersionEntity(versionId);
 
 157       version.setModificationTime(new Date());
 
 158       versionPrivateStore.update(context, itemId, version);
 
 161     return new Response(Void.TYPE);
 
 165   public Response<Void> deleteItemVersion(SessionContext context, Id itemId, Id versionId) {
 
 167         .delete(context, new ElementContext(itemId, versionId), new VersionDataElement());
 
 169     versionPrivateStore.delete(context, itemId, new VersionEntity(versionId));
 
 170     return new Response(Void.TYPE);
 
 174   public Response<ItemVersionStatus> getItemVersionStatus(SessionContext context, Id itemId,
 
 176     if (versionStageStore.get(context, itemId, new VersionEntity(versionId)).isPresent()) {
 
 177       return new Response<>(new ItemVersionStatus(MERGING, true));
 
 180     Optional<SynchronizationStateEntity> publicSyncState =
 
 181         versionPublicStore.getSynchronizationState(context, itemId, versionId);
 
 183     if (!publicSyncState.isPresent()) {
 
 184       return new Response<>(new ItemVersionStatus(UP_TO_DATE, true));
 
 187     SynchronizationStateEntity privateSyncState =
 
 188         versionPrivateStore.getSynchronizationState(context, itemId, versionId)
 
 190             .orElseThrow(() -> new IllegalStateException("private version must exist"));
 
 192     return new Response<>(new ItemVersionStatus(
 
 193         privateSyncState.getPublishTime().equals(publicSyncState.get().getPublishTime())
 
 196         privateSyncState.isDirty()));
 
 200   public Response<Void> tagItemVersion(SessionContext context, Id itemId, Id versionId,
 
 203    /* if (revisionId != null) {
 
 204       throw new UnsupportedOperationException(
 
 205           "In this plugin implementation tag is supported only on versionId");
 
 208     copyElements(context,
 
 209         new ElementContext(itemId, versionId),
 
 210         new ElementContext(itemId, versionId, tag.getName()));*/
 
 212     return new Response(Void.TYPE);
 
 216   public Response<CollaborationPublishResult> publishItemVersion(SessionContext context,
 
 217                                                                  Id itemId, Id versionId,
 
 220       return new Response<>(publishService.publish(context, itemId, versionId, message));
 
 221     } catch (ZusammenException ze) {
 
 222       return new Response<>(
 
 223           new ReturnCode(ErrorCode.CL_ITEM_VERSION_PUBLISH, Module.ZCSP, null, ze.getReturnCode()));
 
 228   public Response<CollaborationMergeResult> syncItemVersion(SessionContext context, Id itemId,
 
 230     CollaborationMergeResult result = syncService.sync(context, itemId, versionId, false);
 
 231     commitStagingService.commitStaging(context, itemId, versionId);
 
 233     return new Response<>(result);
 
 237   public Response<CollaborationMergeResult> forceSyncItemVersion(SessionContext context, Id itemId,
 
 239     CollaborationMergeResult result = syncService.sync(context, itemId, versionId, true);
 
 240     commitStagingService.commitStaging(context, itemId, versionId);
 
 242     return new Response<>(result);
 
 246   public Response<CollaborationMergeResult> mergeItemVersion(SessionContext context, Id itemId,
 
 247                                                              Id versionId, Id sourceVersionId) {
 
 248     throw new UnsupportedOperationException("mergeItemVersion");
 
 252   public Response<CollaborationItemVersionConflict> getItemVersionConflict(SessionContext context,
 
 255     ElementContext elementContext = new ElementContext(itemId, versionId, Id.ZERO);
 
 257     Collection<StageEntity<ElementEntity>> conflictedStagedElementDescriptors =
 
 258         elementStageStore.listConflictedDescriptors(context, elementContext);
 
 260     CollaborationItemVersionConflict result = new CollaborationItemVersionConflict();
 
 261     for (StageEntity<ElementEntity> stagedElementDescriptor : conflictedStagedElementDescriptors) {
 
 262       if (ROOT_ELEMENTS_PARENT_ID.equals(stagedElementDescriptor.getEntity().getId())) {
 
 263         result.setVersionDataConflict(
 
 264             getVersionDataConflict(context, elementContext, stagedElementDescriptor));
 
 266         result.getElementConflictDescriptors()
 
 267             .add(getElementConflictDescriptor(context, elementContext, stagedElementDescriptor));
 
 270     return new Response<>(result);
 
 274   public Response<ItemVersionRevisions> listItemVersionRevisions(SessionContext context, Id itemId,
 
 276     return new Response<>(versionPublicStore.listItemVersionRevisions(context, itemId, versionId));
 
 280   public Response<Revision> getItemVersionRevision(SessionContext context, Id itemId, Id versionId,
 
 282     throw new UnsupportedOperationException(
 
 283         "get revision is not supported in the current cassandra plugin");
 
 287   public Response<CollaborationMergeChange> resetItemVersionRevision(SessionContext context,
 
 288                                                                      Id itemId, Id versionId,
 
 290     throw new UnsupportedOperationException("resetItemVersionRevision function not supported");
 
 295   public Response<CollaborationMergeChange> revertItemVersionRevision(SessionContext context,
 
 296                                                                       Id itemId, Id versionId,
 
 298     Optional<ItemVersion> itemVersion = getItemVersion(context, itemId, versionId, revisionId);
 
 299     if (!itemVersion.isPresent()) {
 
 300       throw new RuntimeException(String
 
 301           .format("Item %s, version %s: Cannot revert to revision %s since it is not found",
 
 302               itemId, versionId, revisionId));
 
 305     // TODO: 12/4/2017 force sync is done in order to clear dirty element on private
 
 306     // this is temp solution that should be fixed.
 
 307     forceSyncItemVersion(context, itemId, versionId);
 
 309     //updateItemVersion(context, itemId, versionId, itemVersion.get().getData());
 
 310     revertService.revert(context, itemId, versionId, revisionId);
 
 312     return new Response<>(new CollaborationMergeChange());
 
 317   public Response<Void> commitElements(SessionContext context, Id itemId, Id versionId, String s) {
 
 319     return new Response(Void.TYPE);
 
 323   public Response<Collection<CollaborationElement>> listElements(SessionContext context,
 
 324                                                                  ElementContext elementContext,
 
 327     return new Response<>(elementPrivateStore.listSubs(context, elementContext, elementId).stream()
 
 328         .map(elementEntity -> convertToCollaborationElement(elementContext, elementEntity))
 
 329         .collect(Collectors.toList()));
 
 333   public Response<CollaborationElement> getElement(SessionContext context,
 
 334                                                    ElementContext elementContext,
 
 335                                                    Namespace namespace, Id elementId) {
 
 336     return new Response<>(elementPrivateStore.get(context, elementContext, elementId)
 
 337         .map(elementEntity -> convertToCollaborationElement(elementContext, elementEntity))
 
 342   public Response<CollaborationElementConflict> getElementConflict(SessionContext context,
 
 343                                                                    ElementContext elementContext,
 
 346     Optional<StageEntity<ElementEntity>> conflictedStagedElement =
 
 348             .getConflicted(context, elementContext, new ElementEntity(elementId));
 
 350     return new Response<>(conflictedStagedElement
 
 351         .map(stagedElement -> getElementConflict(context, elementContext, stagedElement))
 
 356   public Response<Void> createElement(SessionContext context, CollaborationElement element) {
 
 357     elementPrivateStore.create(context,
 
 358         new ElementContext(element.getItemId(), element.getVersionId()),
 
 359         convertToElementEntity(element));
 
 360     return new Response(Void.TYPE);
 
 364   public Response<Void> updateElement(SessionContext context, CollaborationElement element) {
 
 365     elementPrivateStore.update(context,
 
 366         new ElementContext(element.getItemId(), element.getVersionId()),
 
 367         convertToElementEntity(element));
 
 368     return new Response(Void.TYPE);
 
 372   public Response<Void> deleteElement(SessionContext context, CollaborationElement element) {
 
 374         .delete(context, new ElementContext(element.getItemId(), element.getVersionId()),
 
 375             convertToElementEntity(element));
 
 377     return new Response(Void.TYPE);
 
 381   public Response<CollaborationMergeResult> resolveElementConflict(SessionContext context,
 
 382                                                                    CollaborationElement element,
 
 383                                                                    Resolution resolution) {
 
 384     ElementContext elementContext = new ElementContext(element.getItemId(), element.getVersionId());
 
 386         .resolveConflict(context, elementContext, convertToElementEntity(element), resolution);
 
 387     commitStagingService.commitStaging(context, element.getItemId(), element.getVersionId());
 
 389     return new Response<>(new CollaborationMergeResult());
 
 393   public Response<ItemVersion> getItemVersion(SessionContext context, Space space, Id itemId,
 
 394                                               Id versionId, Id revisionId) {
 
 395     return new Response<>(getItemVersion(context, itemId, versionId, revisionId).orElse(null));
 
 399   public Response<HealthInfo> checkHealth(SessionContext context) throws ZusammenException {
 
 400     HealthInfo healthInfo = versionPublicStore.checkHealth(context)
 
 401         ? new HealthInfo(Module.ZCSP.getDescription(), HealthStatus.UP, "")
 
 402         : new HealthInfo(Module.ZCSP.getDescription(), HealthStatus.DOWN, "No Schema Available");
 
 404     return new Response<>(healthInfo);
 
 407   private Optional<ItemVersion> getItemVersion(SessionContext context, Id itemId, Id versionId,
 
 409     // since revisions are kept only on public - get from there
 
 410     Optional<VersionEntity> versionEntity = versionPublicStore.get(context, itemId, versionId);
 
 411     if (!versionEntity.isPresent()) {
 
 412       return Optional.empty();
 
 415     return elementPublicStore
 
 416         .getDescriptor(context, new ElementContext(itemId, versionId, revisionId),
 
 417             ROOT_ELEMENTS_PARENT_ID)
 
 418         .map(ZusammenPluginUtil::convertToVersionData)
 
 419         .map(itemVersionData -> convertToItemVersion(versionEntity.get(), itemVersionData));
 
 422   private List<ElementEntity> listVersionElements(SessionContext context,
 
 423                                                   ElementContext elementContext) {
 
 424     return elementPrivateStore.listIds(context, elementContext).entrySet().stream() // TODO:
 
 426         .map(entry -> elementPrivateStore.get(context, elementContext, entry.getKey()).get())
 
 427         .collect(Collectors.toList());
 
 430   private void copyElements(SessionContext context,
 
 431                             ElementContext sourceContext, ElementContext targetContext) {
 
 432     listVersionElements(context, sourceContext).forEach(element -> {
 
 433       // publishTime copied as is and dirty is off
 
 435           elementPrivateStore.getSynchronizationState(context, sourceContext, element.getId())
 
 436               .get().getPublishTime();
 
 437       elementPrivateStore.commitStagedCreate(context, targetContext, element, publishTime);
 
 441   private ItemVersionDataConflict getVersionDataConflict(SessionContext context,
 
 442                                                          ElementContext elementContext,
 
 443                                                          StageEntity<ElementEntity> stagedElementDescriptor) {
 
 444     ItemVersionDataConflict versionConflict = new ItemVersionDataConflict();
 
 445     versionConflict.setRemoteData(convertToVersionData(stagedElementDescriptor.getEntity()));
 
 446     if (stagedElementDescriptor.getAction() == Action.UPDATE) {
 
 447       versionConflict.setLocalData(getPrivateVersionData(context, elementContext));
 
 449     return versionConflict;
 
 452   private ItemVersionData getPrivateVersionData(SessionContext context,
 
 453                                                 ElementContext elementContext) {
 
 454     return elementPrivateStore.getDescriptor(context, elementContext, ROOT_ELEMENTS_PARENT_ID)
 
 455         .map(ZusammenPluginUtil::convertToVersionData)
 
 456         .orElseThrow(() -> new IllegalStateException("Version must have data"));
 
 459   private ElementConflictDescriptor getElementConflictDescriptor(SessionContext context,
 
 460                                                                  ElementContext elementContext,
 
 461                                                                  StageEntity<ElementEntity> stagedElementDescriptor) {
 
 462     ElementDescriptor elementDescriptorFromStage =
 
 463         convertToElementDescriptor(elementContext, (stagedElementDescriptor.getEntity()));
 
 465     ElementConflictDescriptor conflictDescriptor = new ElementConflictDescriptor();
 
 466     switch (stagedElementDescriptor.getAction()) {
 
 468         conflictDescriptor.setRemoteElementDescriptor(elementDescriptorFromStage);
 
 471         conflictDescriptor.setRemoteElementDescriptor(elementDescriptorFromStage);
 
 472         conflictDescriptor.setLocalElementDescriptor(convertToElementDescriptor(elementContext,
 
 474                 .getDescriptor(context, elementContext, stagedElementDescriptor.getEntity().getId())
 
 475                 .orElse(null)));// updated on public while deleted from private
 
 478         conflictDescriptor.setLocalElementDescriptor(elementDescriptorFromStage);
 
 483     return conflictDescriptor;
 
 486   private void addElementsToChangedElements(ElementContext elementContext,
 
 487                                             Collection<ElementEntity> elements,
 
 488                                             Collection<CollaborationElementChange> changedElements,
 
 491         .map(elementEntity -> convertToElementChange(elementContext, elementEntity, action))
 
 492         .forEach(changedElements::add);
 
 495   private CollaborationElementConflict getElementConflict(SessionContext context,
 
 496                                                           ElementContext entityContext,
 
 497                                                           StageEntity<ElementEntity> stagedElement) {
 
 498     CollaborationElement elementFromStage =
 
 499         convertToCollaborationElement(entityContext, (stagedElement.getEntity()));
 
 501     CollaborationElementConflict conflict = new CollaborationElementConflict();
 
 502     switch (stagedElement.getAction()) {
 
 504         conflict.setRemoteElement(elementFromStage);
 
 507         conflict.setRemoteElement(elementFromStage);
 
 508         conflict.setLocalElement(
 
 509             elementPrivateStore.get(context, entityContext, stagedElement.getEntity().getId())
 
 510                 .map(element -> convertToCollaborationElement(entityContext, element))
 
 511                 .orElse(null));// updated on public while deleted from private
 
 514         conflict.setLocalElement(elementFromStage);