1 package org.openecomp.core.zusammen.plugin.collaboration;
3 import com.amdocs.zusammen.datatypes.Id;
4 import com.amdocs.zusammen.datatypes.SessionContext;
5 import com.amdocs.zusammen.datatypes.item.Action;
6 import com.amdocs.zusammen.datatypes.item.ElementContext;
7 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeChange;
8 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeConflict;
9 import com.amdocs.zusammen.sdk.collaboration.types.CollaborationMergeResult;
10 import org.openecomp.core.zusammen.plugin.dao.types.ElementEntity;
11 import org.openecomp.core.zusammen.plugin.dao.types.StageEntity;
12 import org.openecomp.core.zusammen.plugin.dao.types.SynchronizationStateEntity;
13 import org.openecomp.core.zusammen.plugin.dao.types.VersionEntity;
15 import java.util.Collection;
16 import java.util.Date;
17 import java.util.HashSet;
18 import java.util.List;
20 import java.util.Optional;
22 import java.util.function.Function;
23 import java.util.function.Predicate;
24 import java.util.stream.Collectors;
26 import static org.openecomp.core.zusammen.plugin.ZusammenPluginConstants.ROOT_ELEMENTS_PARENT_ID;
28 public class SyncService {
29 private static final String PULL_NON_EXISTING_VERSION =
30 "Item Id %s, version Id %s: Non existing version cannot be synced.";
31 private static final String PUBLIC_SYNC_STATE_EXISTS_WITHOUT_ELEMENT =
32 "Item Id %s, version Id %s: Sync state of element with Id %s " +
33 "exists in public space while the element does not";
34 private static final String PRIVATE_UNPUBLISHED_SYNC_STATE_EXISTS_WITHOUT_ELEMENT =
35 "Item Id %s, version Id %s: Sync state of unpublished element with Id %s " +
36 "exists in private space while the element does not";
38 private VersionPublicStore versionPublicStore;
39 private VersionPrivateStore versionPrivateStore;
40 private VersionStageStore versionStageStore;
41 private ElementPublicStore elementPublicStore;
42 private ElementPrivateStore elementPrivateStore;
43 private ElementStageStore elementStageStore;
45 public SyncService(VersionPublicStore versionPublicStore,
46 VersionPrivateStore versionPrivateStore,
47 VersionStageStore versionStageStore,
48 ElementPublicStore elementPublicStore,
49 ElementPrivateStore elementPrivateStore,
50 ElementStageStore elementStageStore) {
51 this.versionPublicStore = versionPublicStore;
52 this.versionPrivateStore = versionPrivateStore;
53 this.versionStageStore = versionStageStore;
54 this.elementPublicStore = elementPublicStore;
55 this.elementPrivateStore = elementPrivateStore;
56 this.elementStageStore = elementStageStore;
59 public CollaborationMergeResult sync(SessionContext context, Id itemId, Id versionId,
61 SynchronizationStateEntity publicVersionSyncState =
62 versionPublicStore.getSynchronizationState(context, itemId, versionId)
63 .orElseThrow(() -> new IllegalStateException(
64 String.format(PULL_NON_EXISTING_VERSION, itemId.toString(), versionId.toString())));
66 Date publishTime = publicVersionSyncState.getPublishTime();
68 Optional<SynchronizationStateEntity> privateVersionSyncState =
69 versionPrivateStore.getSynchronizationState(context, itemId, versionId);
71 if (force || !privateVersionSyncState.isPresent() ||
72 !publishTime.equals(privateVersionSyncState.get().getPublishTime())) {
73 ElementContext elementContext =
74 new ElementContext(itemId, versionId, publicVersionSyncState.getRevisionId());
76 Collection<SynchronizationStateEntity> publicSyncStates =
77 elementPublicStore.listSynchronizationStates(context, elementContext);
78 Collection<SynchronizationStateEntity> privateSyncStates =
79 elementPrivateStore.listSynchronizationStates(context, elementContext);
80 Map<Id, SynchronizationStateEntity> publicSyncStateById = toMapById(publicSyncStates);
82 Set<Id> syncedElements = new HashSet<>();
84 List<SynchronizationStateEntity> dirtyPrivateSyncStates = privateSyncStates.stream()
85 .filter(SynchronizationStateEntity::isDirty)
86 .collect(Collectors.toList());
88 forceSyncDirtyElements(context, elementContext, dirtyPrivateSyncStates, publicSyncStateById,
92 if (!privateVersionSyncState.isPresent() ||
93 !publishTime.equals(privateVersionSyncState.get().getPublishTime())) {
94 syncVersion(context, itemId, versionId, publishTime, privateVersionSyncState.isPresent());
95 syncElements(context, elementContext,
96 privateVersionSyncState.map(SynchronizationStateEntity::getPublishTime).orElse(null),
97 publicSyncStates, privateSyncStates, publicSyncStateById, syncedElements);
101 return createResult();
104 private CollaborationMergeResult createResult() {
105 CollaborationMergeResult result = new CollaborationMergeResult();
106 result.setChange(new CollaborationMergeChange());
107 result.setConflict(new CollaborationMergeConflict());
111 private void syncVersion(SessionContext context, Id itemId, Id versionId, Date publishTime,
112 boolean versionExistOnPrivate) {
113 if (versionExistOnPrivate) {
114 stageVersion(context, itemId, new VersionEntity(versionId), Action.UPDATE, publishTime);
116 stageVersion(context, itemId, versionPublicStore.get(context, itemId, versionId)
117 .orElseThrow(() -> new IllegalArgumentException(String
118 .format(PULL_NON_EXISTING_VERSION, itemId.toString(), versionId.toString()))),
119 Action.CREATE, publishTime);
123 private void syncElements(SessionContext context, ElementContext elementContext,
124 Date previousSyncedPublishTime,
125 Collection<SynchronizationStateEntity> publicSyncStates,
126 Collection<SynchronizationStateEntity> privateSyncStates,
127 Map<Id, SynchronizationStateEntity> publicSyncStateById,
128 Set<Id> syncedElements) {
129 Map<Id, SynchronizationStateEntity> privateSyncStateById = toMapById(privateSyncStates);
131 Collection<SynchronizationStateEntity> updatedPublicSyncStates =
132 previousSyncedPublishTime == null
134 : publicSyncStates.stream()
135 .filter(syncState -> syncState.getPublishTime().after(previousSyncedPublishTime))
136 .collect(Collectors.toList());
138 syncPublicUpdatedElements(context, elementContext, updatedPublicSyncStates,
139 publicSyncStateById, privateSyncStateById, syncedElements);
141 List<SynchronizationStateEntity> onlyOnPrivatePublishedSyncStates =
142 privateSyncStates.stream()
143 .filter(syncState -> !publicSyncStateById.containsKey(syncState.getId()) &&
144 syncState.getPublishTime() != null)
145 .collect(Collectors.toList());
147 syncPublicDeletedElements(context, elementContext, onlyOnPrivatePublishedSyncStates,
148 publicSyncStateById, privateSyncStateById, syncedElements);
151 private void syncPublicUpdatedElements(SessionContext context, ElementContext elementContext,
152 Collection<SynchronizationStateEntity> updatedPublicSyncStates,
153 Map<Id, SynchronizationStateEntity> publicSyncStateById,
154 Map<Id, SynchronizationStateEntity> privateSyncStateById,
155 Set<Id> syncedElements) {
156 for (SynchronizationStateEntity publicSyncState : updatedPublicSyncStates) {
157 if (syncedElements.contains(publicSyncState.getId())) {
161 ElementEntity publicElement =
162 elementPublicStore.get(context, elementContext, publicSyncState.getId()).orElseThrow(
163 () -> new IllegalStateException(String
164 .format(PUBLIC_SYNC_STATE_EXISTS_WITHOUT_ELEMENT,
165 elementContext.getItemId().getValue(),
166 elementContext.getVersionId().getValue(),
167 publicSyncState.getId().getValue())));
169 SynchronizationStateEntity privateSyncState =
170 privateSyncStateById.get(publicSyncState.getId());
172 if (privateSyncState != null) {
173 if (!privateSyncState.isDirty()) {
174 // not changed on private
175 stageElement(context, elementContext, publicElement,
176 publicSyncState.getPublishTime(),
177 Action.UPDATE, false, null);
178 syncedElements.add(publicSyncState.getId());
180 Optional<ElementEntity> privateElement =
181 elementPrivateStore.get(context, elementContext, publicSyncState.getId());
183 if (privateElement.isPresent()) {
184 // updated on private - conflict if it has different hash
185 stageElement(context, elementContext, publicElement,
186 publicSyncState.getPublishTime(), Action.UPDATE,
187 !publicElement.getElementHash().equals(privateElement.get().getElementHash()),
190 syncedElements.add(publicSyncState.getId());
192 // deleted on private - conflict tree
193 Set<Id> changeTreeElementIds =
194 stagePublicElementTree(context, elementContext, publicElement, publicSyncStateById,
195 (treeElementIds) -> true);
196 syncedElements.addAll(changeTreeElementIds);
200 // not existing on private - new creation on public
201 Set<Id> changeTreeElementIds =
202 stagePublicElementTree(context, elementContext, publicElement, publicSyncStateById,
203 (treeElementIds) -> containsDirty(treeElementIds, privateSyncStateById));
204 syncedElements.addAll(changeTreeElementIds);
209 private void syncPublicDeletedElements(
210 SessionContext context, ElementContext elementContext,
211 Collection<SynchronizationStateEntity> onlyOnPrivatePublishedSyncStates,
212 Map<Id, SynchronizationStateEntity> publicSyncStateById,
213 Map<Id, SynchronizationStateEntity> privateSyncStateById,
214 Set<Id> syncedElements) {
215 for (SynchronizationStateEntity privateSyncState : onlyOnPrivatePublishedSyncStates) {
216 if (syncedElements.contains(privateSyncState.getId())) {
220 Optional<ElementEntity> privateElement =
221 elementPrivateStore.get(context, elementContext, privateSyncState.getId());
223 if (!privateElement.isPresent()) {
224 // deleted on private as well
225 stageElement(context, elementContext, new ElementEntity(privateSyncState.getId()),
226 null, Action.DELETE, false, null);
227 syncedElements.add(privateSyncState.getId());
229 Set<Id> changeTreeElementIds =
230 stageElementTree(context, elementContext, privateElement.get(),
231 elementPrivateStore, publicSyncStateById::containsKey,
232 (treeElementIds) -> containsDirty(treeElementIds, privateSyncStateById),
233 (elementId) -> null, Action.DELETE);
234 syncedElements.addAll(changeTreeElementIds);
239 private void forceSyncDirtyElements(SessionContext context, ElementContext elementContext,
240 List<SynchronizationStateEntity> dirtyPrivateSyncStates,
241 Map<Id, SynchronizationStateEntity> publicSyncStateById,
242 Set<Id> syncedElements) {
243 for (SynchronizationStateEntity privateSyncState : dirtyPrivateSyncStates) {
244 Optional<ElementEntity> privateElement =
245 elementPrivateStore.get(context, elementContext, privateSyncState.getId());
246 if (privateSyncState.getPublishTime() == null) {
247 stageElement(context, elementContext,
248 privateElement.orElseThrow(() -> new IllegalStateException(
249 String.format(PRIVATE_UNPUBLISHED_SYNC_STATE_EXISTS_WITHOUT_ELEMENT,
250 elementContext.getItemId().getValue(),
251 elementContext.getVersionId().getValue(),
252 privateSyncState.getId().getValue()))),
253 null, Action.DELETE, false, null);
255 SynchronizationStateEntity publicSyncState =
256 publicSyncStateById.get(privateSyncState.getId());
257 if (publicSyncState != null) {
258 ElementEntity publicElement =
259 elementPublicStore.get(context, elementContext, privateSyncState.getId()).orElseThrow(
260 () -> new IllegalStateException(String
261 .format(PUBLIC_SYNC_STATE_EXISTS_WITHOUT_ELEMENT,
262 elementContext.getItemId().getValue(),
263 elementContext.getVersionId().getValue(),
264 privateSyncState.getId().getValue())));
266 stageElement(context, elementContext, publicElement, publicSyncState.getPublishTime(),
267 privateElement.isPresent() ? Action.UPDATE : Action.CREATE, false, null);
269 stageElement(context, elementContext, privateElement.isPresent()
270 ? privateElement.get()
271 : new ElementEntity(privateSyncState.getId()),
272 null, Action.DELETE, false, null);
275 syncedElements.add(privateSyncState.getId());
279 private Set<Id> stagePublicElementTree(SessionContext context,
280 ElementContext elementContext,
281 ElementEntity publicElement,
282 Map<Id, SynchronizationStateEntity> publicSyncStateById,
283 Predicate<Set<Id>> isElementTreeConflicted) {
286 return stageElementTree(context, elementContext, publicElement,
288 (elementId) -> elementPrivateStore.getDescriptor(context, elementContext, elementId)
290 isElementTreeConflicted,
291 (elementId) -> publicSyncStateById.get(elementId).getPublishTime(),
295 private Set<Id> stageElementTree(SessionContext context, ElementContext elementContext,
296 ElementEntity element,
297 ElementStore elementStore,
298 Predicate<Id> isElementExist,
299 Predicate<Set<Id>> isElementTreeConflicted,
300 Function<Id, Date> stagePublishTimeGetter,
301 Action stageAction) {
302 ElementEntity elementTreeRoot = findRootElementOfChange(context, elementContext,
303 elementStore, isElementExist, element);
305 Set<Id> elementTreeIds = new HashSet<>();
306 elementTreeIds.add(elementTreeRoot.getId());
308 Set<Id> subElementIds = stageElementSubs(context, elementContext, elementStore, elementTreeRoot,
309 stagePublishTimeGetter, stageAction);
310 elementTreeIds.addAll(subElementIds);
312 boolean conflicted = isElementTreeConflicted.test(elementTreeIds);
313 stageElement(context, elementContext, elementTreeRoot,
314 stagePublishTimeGetter.apply(elementTreeRoot.getId()), stageAction, conflicted,
315 conflicted ? subElementIds : null);
316 return elementTreeIds;
319 private ElementEntity findRootElementOfChange(SessionContext context,
320 ElementContext elementContext,
321 ElementStore elementStore,
322 Predicate<Id> isElementExistOnOppositeStore,
323 ElementEntity element) {
324 return element.getId().equals(ROOT_ELEMENTS_PARENT_ID) ||
325 isElementExistOnOppositeStore.test(element.getParentId())
327 : findRootElementOfChange(context, elementContext, elementStore,
328 isElementExistOnOppositeStore,
329 elementStore.get(context, elementContext, element.getParentId())
330 .orElseThrow(() -> new IllegalStateException(
331 String.format("Element %s exists while its parent element %s does not",
332 element.getId(), element.getParentId()))));
335 private boolean containsDirty(Set<Id> elementIds,
336 Map<Id, SynchronizationStateEntity> syncStateById) {
337 return elementIds.stream().anyMatch(elementId -> {
338 SynchronizationStateEntity privateSyncState = syncStateById.get(elementId);
339 return privateSyncState != null && privateSyncState.isDirty();
343 private Set<Id> stageElementSubs(SessionContext context, ElementContext elementContext,
344 ElementStore elementStore, ElementEntity parentElement,
345 Function<Id, Date> stagePublishTimeGetter, Action stageAction) {
346 Set<Id> elementTreeIds = new HashSet<>();
347 for (Id elementId : parentElement.getSubElementIds()) {
348 ElementEntity element = elementStore.get(context, elementContext, elementId).get();
350 stageElement(context, elementContext, element, stagePublishTimeGetter.apply(elementId),
351 stageAction, false, null);
353 elementTreeIds.add(elementId);
354 elementTreeIds.addAll(
355 stageElementSubs(context, elementContext, elementStore, element, stagePublishTimeGetter,
358 return elementTreeIds;
361 private void stageElement(SessionContext context, ElementContext elementContext,
362 ElementEntity element, Date publishTime, Action action,
363 boolean conflicted, Set<Id> conflictDependents) {
364 StageEntity<ElementEntity> elementStage =
365 new StageEntity<>(element, publishTime, action, conflicted);
366 if (conflictDependents != null) {
367 elementStage.setConflictDependents(
368 conflictDependents.stream().map(ElementEntity::new).collect(Collectors.toSet()));
370 elementStageStore.create(context, elementContext, elementStage);
373 private void stageVersion(SessionContext context, Id itemId, VersionEntity stageVersion,
374 Action stageAction, Date publishTime) {
376 .create(context, itemId, new StageEntity<>(stageVersion, publishTime, stageAction, false));
379 private Map<Id, SynchronizationStateEntity> toMapById(
380 Collection<SynchronizationStateEntity> syncStates) {
381 return syncStates.stream()
382 .collect(Collectors.toMap(SynchronizationStateEntity::getId, Function.identity()));