1 package org.openecomp.core.zusammen.plugin.collaboration;
 
   3 import com.amdocs.zusammen.datatypes.Id;
 
   4 import com.amdocs.zusammen.datatypes.SessionContext;
 
   5 import com.amdocs.zusammen.datatypes.item.Action;
 
   6 import com.amdocs.zusammen.datatypes.item.ElementContext;
 
   7 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeChange;
 
   8 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeConflict;
 
   9 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeResult;
 
  10 import org.openecomp.core.zusammen.plugin.dao.types.ElementEntity;
 
  11 import org.openecomp.core.zusammen.plugin.dao.types.StageEntity;
 
  12 import org.openecomp.core.zusammen.plugin.dao.types.SynchronizationStateEntity;
 
  13 import org.openecomp.core.zusammen.plugin.dao.types.VersionEntity;
 
  15 import java.util.Collection;
 
  16 import java.util.Date;
 
  17 import java.util.HashSet;
 
  18 import java.util.List;
 
  20 import java.util.Optional;
 
  22 import java.util.function.Function;
 
  23 import java.util.function.Predicate;
 
  24 import java.util.stream.Collectors;
 
  26 import static org.openecomp.core.zusammen.plugin.ZusammenPluginConstants.ROOT_ELEMENTS_PARENT_ID;
 
  28 public class SyncService {
 
  29   private static final String PULL_NON_EXISTING_VERSION =
 
  30       "Item Id %s, version Id %s: Non existing version cannot be synced.";
 
  31   private static final String PUBLIC_SYNC_STATE_EXISTS_WITHOUT_ELEMENT =
 
  32       "Item Id %s, version Id %s: Sync state of element with Id %s " +
 
  33           "exists in public space while the element does not";
 
  34   private static final String PRIVATE_UNPUBLISHED_SYNC_STATE_EXISTS_WITHOUT_ELEMENT =
 
  35       "Item Id %s, version Id %s: Sync state of unpublished element with Id %s " +
 
  36           "exists in private space while the element does not";
 
  38   private VersionPublicStore versionPublicStore;
 
  39   private VersionPrivateStore versionPrivateStore;
 
  40   private VersionStageStore versionStageStore;
 
  41   private ElementPublicStore elementPublicStore;
 
  42   private ElementPrivateStore elementPrivateStore;
 
  43   private ElementStageStore elementStageStore;
 
  45   public SyncService(VersionPublicStore versionPublicStore,
 
  46                      VersionPrivateStore versionPrivateStore,
 
  47                      VersionStageStore versionStageStore,
 
  48                      ElementPublicStore elementPublicStore,
 
  49                      ElementPrivateStore elementPrivateStore,
 
  50                      ElementStageStore elementStageStore) {
 
  51     this.versionPublicStore = versionPublicStore;
 
  52     this.versionPrivateStore = versionPrivateStore;
 
  53     this.versionStageStore = versionStageStore;
 
  54     this.elementPublicStore = elementPublicStore;
 
  55     this.elementPrivateStore = elementPrivateStore;
 
  56     this.elementStageStore = elementStageStore;
 
  59   public CollaborationMergeResult sync(SessionContext context, Id itemId, Id versionId,
 
  61     SynchronizationStateEntity publicVersionSyncState =
 
  62         versionPublicStore.getSynchronizationState(context, itemId, versionId)
 
  63             .orElseThrow(() -> new IllegalStateException(
 
  64                 String.format(PULL_NON_EXISTING_VERSION, itemId.toString(), versionId.toString())));
 
  66     Date publishTime = publicVersionSyncState.getPublishTime();
 
  68     Optional<SynchronizationStateEntity> privateVersionSyncState =
 
  69         versionPrivateStore.getSynchronizationState(context, itemId, versionId);
 
  71     if (force || !privateVersionSyncState.isPresent() ||
 
  72         !publishTime.equals(privateVersionSyncState.get().getPublishTime())) {
 
  73       ElementContext elementContext =
 
  74           new ElementContext(itemId, versionId, publicVersionSyncState.getRevisionId());
 
  76       Collection<SynchronizationStateEntity> publicSyncStates =
 
  77           elementPublicStore.listSynchronizationStates(context, elementContext);
 
  78       Collection<SynchronizationStateEntity> privateSyncStates =
 
  79           elementPrivateStore.listSynchronizationStates(context, elementContext);
 
  80       Map<Id, SynchronizationStateEntity> publicSyncStateById = toMapById(publicSyncStates);
 
  82       Set<Id> syncedElements = new HashSet<>();
 
  84         List<SynchronizationStateEntity> dirtyPrivateSyncStates = privateSyncStates.stream()
 
  85             .filter(SynchronizationStateEntity::isDirty)
 
  86             .collect(Collectors.toList());
 
  88         forceSyncDirtyElements(context, elementContext, dirtyPrivateSyncStates, publicSyncStateById,
 
  92       if (!privateVersionSyncState.isPresent() ||
 
  93           !publishTime.equals(privateVersionSyncState.get().getPublishTime())) {
 
  94         syncVersion(context, itemId, versionId, publishTime, privateVersionSyncState.isPresent());
 
  95         syncElements(context, elementContext,
 
  96             privateVersionSyncState.map(SynchronizationStateEntity::getPublishTime).orElse(null),
 
  97             publicSyncStates, privateSyncStates, publicSyncStateById, syncedElements);
 
 101     return createResult();
 
 104   private CollaborationMergeResult createResult() {
 
 105     CollaborationMergeResult result = new CollaborationMergeResult();
 
 106     result.setChange(new CollaborationMergeChange());
 
 107     result.setConflict(new CollaborationMergeConflict());
 
 111   private void syncVersion(SessionContext context, Id itemId, Id versionId, Date publishTime,
 
 112                            boolean versionExistOnPrivate) {
 
 113     if (versionExistOnPrivate) {
 
 114       stageVersion(context, itemId, new VersionEntity(versionId), Action.UPDATE, publishTime);
 
 116       stageVersion(context, itemId, versionPublicStore.get(context, itemId, versionId)
 
 117               .orElseThrow(() -> new IllegalArgumentException(String
 
 118                   .format(PULL_NON_EXISTING_VERSION, itemId.toString(), versionId.toString()))),
 
 119           Action.CREATE, publishTime);
 
 123   private void syncElements(SessionContext context, ElementContext elementContext,
 
 124                             Date previousSyncedPublishTime,
 
 125                             Collection<SynchronizationStateEntity> publicSyncStates,
 
 126                             Collection<SynchronizationStateEntity> privateSyncStates,
 
 127                             Map<Id, SynchronizationStateEntity> publicSyncStateById,
 
 128                             Set<Id> syncedElements) {
 
 129     Map<Id, SynchronizationStateEntity> privateSyncStateById = toMapById(privateSyncStates);
 
 131     Collection<SynchronizationStateEntity> updatedPublicSyncStates =
 
 132         previousSyncedPublishTime == null
 
 134             : publicSyncStates.stream()
 
 135                 .filter(syncState -> syncState.getPublishTime().after(previousSyncedPublishTime))
 
 136                 .collect(Collectors.toList());
 
 138     syncPublicUpdatedElements(context, elementContext, updatedPublicSyncStates,
 
 139         publicSyncStateById, privateSyncStateById, syncedElements);
 
 141     List<SynchronizationStateEntity> onlyOnPrivatePublishedSyncStates =
 
 142         privateSyncStates.stream()
 
 143             .filter(syncState -> !publicSyncStateById.containsKey(syncState.getId()) &&
 
 144                 syncState.getPublishTime() != null)
 
 145             .collect(Collectors.toList());
 
 147     syncPublicDeletedElements(context, elementContext, onlyOnPrivatePublishedSyncStates,
 
 148         publicSyncStateById, privateSyncStateById, syncedElements);
 
 151   private void syncPublicUpdatedElements(SessionContext context, ElementContext elementContext,
 
 152                                          Collection<SynchronizationStateEntity> updatedPublicSyncStates,
 
 153                                          Map<Id, SynchronizationStateEntity> publicSyncStateById,
 
 154                                          Map<Id, SynchronizationStateEntity> privateSyncStateById,
 
 155                                          Set<Id> syncedElements) {
 
 156     for (SynchronizationStateEntity publicSyncState : updatedPublicSyncStates) {
 
 157       if (syncedElements.contains(publicSyncState.getId())) {
 
 161       ElementEntity publicElement =
 
 162           elementPublicStore.get(context, elementContext, publicSyncState.getId()).orElseThrow(
 
 163               () -> new IllegalStateException(String
 
 164                   .format(PUBLIC_SYNC_STATE_EXISTS_WITHOUT_ELEMENT,
 
 165                       elementContext.getItemId().getValue(),
 
 166                       elementContext.getVersionId().getValue(),
 
 167                       publicSyncState.getId().getValue())));
 
 169       SynchronizationStateEntity privateSyncState =
 
 170           privateSyncStateById.get(publicSyncState.getId());
 
 172       if (privateSyncState != null) {
 
 173         if (!privateSyncState.isDirty()) {
 
 174           // not changed on private
 
 175           stageElement(context, elementContext, publicElement,
 
 176               publicSyncState.getPublishTime(),
 
 177               Action.UPDATE, false, null);
 
 178           syncedElements.add(publicSyncState.getId());
 
 180           Optional<ElementEntity> privateElement =
 
 181               elementPrivateStore.get(context, elementContext, publicSyncState.getId());
 
 183           if (privateElement.isPresent()) {
 
 184             // updated on private - conflict if it has different hash
 
 185             stageElement(context, elementContext, publicElement,
 
 186                 publicSyncState.getPublishTime(), Action.UPDATE,
 
 187                 !publicElement.getElementHash().equals(privateElement.get().getElementHash()),
 
 190             syncedElements.add(publicSyncState.getId());
 
 192             // deleted on private - conflict tree
 
 193             Set<Id> changeTreeElementIds =
 
 194                 stagePublicElementTree(context, elementContext, publicElement, publicSyncStateById,
 
 195                     (treeElementIds) -> true);
 
 196             syncedElements.addAll(changeTreeElementIds);
 
 200         // not existing on private - new creation on public
 
 201         Set<Id> changeTreeElementIds =
 
 202             stagePublicElementTree(context, elementContext, publicElement, publicSyncStateById,
 
 203                 (treeElementIds) -> containsDirty(treeElementIds, privateSyncStateById));
 
 204         syncedElements.addAll(changeTreeElementIds);
 
 209   private void syncPublicDeletedElements(
 
 210       SessionContext context, ElementContext elementContext,
 
 211       Collection<SynchronizationStateEntity> onlyOnPrivatePublishedSyncStates,
 
 212       Map<Id, SynchronizationStateEntity> publicSyncStateById,
 
 213       Map<Id, SynchronizationStateEntity> privateSyncStateById,
 
 214       Set<Id> syncedElements) {
 
 215     for (SynchronizationStateEntity privateSyncState : onlyOnPrivatePublishedSyncStates) {
 
 216       if (syncedElements.contains(privateSyncState.getId())) {
 
 220       Optional<ElementEntity> privateElement =
 
 221           elementPrivateStore.get(context, elementContext, privateSyncState.getId());
 
 223       if (!privateElement.isPresent()) {
 
 224         // deleted on private as well
 
 225         stageElement(context, elementContext, new ElementEntity(privateSyncState.getId()),
 
 226             null, Action.DELETE, false, null);
 
 227         syncedElements.add(privateSyncState.getId());
 
 229         Set<Id> changeTreeElementIds =
 
 230             stageElementTree(context, elementContext, privateElement.get(),
 
 231                 elementPrivateStore, publicSyncStateById::containsKey,
 
 232                 (treeElementIds) -> containsDirty(treeElementIds, privateSyncStateById),
 
 233                 (elementId) -> null, Action.DELETE);
 
 234         syncedElements.addAll(changeTreeElementIds);
 
 239   private void forceSyncDirtyElements(SessionContext context, ElementContext elementContext,
 
 240                                       List<SynchronizationStateEntity> dirtyPrivateSyncStates,
 
 241                                       Map<Id, SynchronizationStateEntity> publicSyncStateById,
 
 242                                       Set<Id> syncedElements) {
 
 243     for (SynchronizationStateEntity privateSyncState : dirtyPrivateSyncStates) {
 
 244       Optional<ElementEntity> privateElement =
 
 245           elementPrivateStore.get(context, elementContext, privateSyncState.getId());
 
 246       if (privateSyncState.getPublishTime() == null) {
 
 247         stageElement(context, elementContext,
 
 248             privateElement.orElseThrow(() -> new IllegalStateException(
 
 249                 String.format(PRIVATE_UNPUBLISHED_SYNC_STATE_EXISTS_WITHOUT_ELEMENT,
 
 250                     elementContext.getItemId().getValue(),
 
 251                     elementContext.getVersionId().getValue(),
 
 252                     privateSyncState.getId().getValue()))),
 
 253             null, Action.DELETE, false, null);
 
 255         SynchronizationStateEntity publicSyncState =
 
 256             publicSyncStateById.get(privateSyncState.getId());
 
 257         if (publicSyncState != null) {
 
 258           ElementEntity publicElement =
 
 259               elementPublicStore.get(context, elementContext, privateSyncState.getId()).orElseThrow(
 
 260                   () -> new IllegalStateException(String
 
 261                       .format(PUBLIC_SYNC_STATE_EXISTS_WITHOUT_ELEMENT,
 
 262                           elementContext.getItemId().getValue(),
 
 263                           elementContext.getVersionId().getValue(),
 
 264                           privateSyncState.getId().getValue())));
 
 266           stageElement(context, elementContext, publicElement, publicSyncState.getPublishTime(),
 
 267               privateElement.isPresent() ? Action.UPDATE : Action.CREATE, false, null);
 
 269           stageElement(context, elementContext, privateElement.isPresent()
 
 270                   ? privateElement.get()
 
 271                   : new ElementEntity(privateSyncState.getId()),
 
 272               null, Action.DELETE, false, null);
 
 275       syncedElements.add(privateSyncState.getId());
 
 279   private Set<Id> stagePublicElementTree(SessionContext context,
 
 280                                          ElementContext elementContext,
 
 281                                          ElementEntity publicElement,
 
 282                                          Map<Id, SynchronizationStateEntity> publicSyncStateById,
 
 283                                          Predicate<Set<Id>> isElementTreeConflicted) {
 
 286     return stageElementTree(context, elementContext, publicElement,
 
 288         (elementId) -> elementPrivateStore.getDescriptor(context, elementContext, elementId)
 
 290         isElementTreeConflicted,
 
 291         (elementId) -> publicSyncStateById.get(elementId).getPublishTime(),
 
 295   private Set<Id> stageElementTree(SessionContext context, ElementContext elementContext,
 
 296                                    ElementEntity element,
 
 297                                    ElementStore elementStore,
 
 298                                    Predicate<Id> isElementExist,
 
 299                                    Predicate<Set<Id>> isElementTreeConflicted,
 
 300                                    Function<Id, Date> stagePublishTimeGetter,
 
 301                                    Action stageAction) {
 
 302     ElementEntity elementTreeRoot = findRootElementOfChange(context, elementContext,
 
 303         elementStore, isElementExist, element);
 
 305     Set<Id> elementTreeIds = new HashSet<>();
 
 306     elementTreeIds.add(elementTreeRoot.getId());
 
 308     Set<Id> subElementIds = stageElementSubs(context, elementContext, elementStore, elementTreeRoot,
 
 309         stagePublishTimeGetter, stageAction);
 
 310     elementTreeIds.addAll(subElementIds);
 
 312     boolean conflicted = isElementTreeConflicted.test(elementTreeIds);
 
 313     stageElement(context, elementContext, elementTreeRoot,
 
 314         stagePublishTimeGetter.apply(elementTreeRoot.getId()), stageAction, conflicted,
 
 315         conflicted ? subElementIds : null);
 
 316     return elementTreeIds;
 
 319   private ElementEntity findRootElementOfChange(SessionContext context,
 
 320                                                 ElementContext elementContext,
 
 321                                                 ElementStore elementStore,
 
 322                                                 Predicate<Id> isElementExistOnOppositeStore,
 
 323                                                 ElementEntity element) {
 
 324     return element.getId().equals(ROOT_ELEMENTS_PARENT_ID) ||
 
 325         isElementExistOnOppositeStore.test(element.getParentId())
 
 327         : findRootElementOfChange(context, elementContext, elementStore,
 
 328             isElementExistOnOppositeStore,
 
 329             elementStore.get(context, elementContext, element.getParentId())
 
 330                 .orElseThrow(() -> new IllegalStateException(
 
 331                     String.format("Element %s exists while its parent element %s does not",
 
 332                         element.getId(), element.getParentId()))));
 
 335   private boolean containsDirty(Set<Id> elementIds,
 
 336                                 Map<Id, SynchronizationStateEntity> syncStateById) {
 
 337     return elementIds.stream().anyMatch(elementId -> {
 
 338       SynchronizationStateEntity privateSyncState = syncStateById.get(elementId);
 
 339       return privateSyncState != null && privateSyncState.isDirty();
 
 343   private Set<Id> stageElementSubs(SessionContext context, ElementContext elementContext,
 
 344                                    ElementStore elementStore, ElementEntity parentElement,
 
 345                                    Function<Id, Date> stagePublishTimeGetter, Action stageAction) {
 
 346     Set<Id> elementTreeIds = new HashSet<>();
 
 347     for (Id elementId : parentElement.getSubElementIds()) {
 
 348       ElementEntity element = elementStore.get(context, elementContext, elementId).get();
 
 350       stageElement(context, elementContext, element, stagePublishTimeGetter.apply(elementId),
 
 351           stageAction, false, null);
 
 353       elementTreeIds.add(elementId);
 
 354       elementTreeIds.addAll(
 
 355           stageElementSubs(context, elementContext, elementStore, element, stagePublishTimeGetter,
 
 358     return elementTreeIds;
 
 361   private void stageElement(SessionContext context, ElementContext elementContext,
 
 362                             ElementEntity element, Date publishTime, Action action,
 
 363                             boolean conflicted, Set<Id> conflictDependents) {
 
 364     StageEntity<ElementEntity> elementStage =
 
 365         new StageEntity<>(element, publishTime, action, conflicted);
 
 366     if (conflictDependents != null) {
 
 367       elementStage.setConflictDependents(
 
 368           conflictDependents.stream().map(ElementEntity::new).collect(Collectors.toSet()));
 
 370     elementStageStore.create(context, elementContext, elementStage);
 
 373   private void stageVersion(SessionContext context, Id itemId, VersionEntity stageVersion,
 
 374                             Action stageAction, Date publishTime) {
 
 376         .create(context, itemId, new StageEntity<>(stageVersion, publishTime, stageAction, false));
 
 379   private Map<Id, SynchronizationStateEntity> toMapById(
 
 380       Collection<SynchronizationStateEntity> syncStates) {
 
 381     return syncStates.stream()
 
 382         .collect(Collectors.toMap(SynchronizationStateEntity::getId, Function.identity()));