2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
26 package org.openecomp.sparky.synchronizer;
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
30 import java.util.Collection;
31 import java.util.LinkedHashSet;
32 import java.util.concurrent.ExecutorService;
33 import java.util.function.Supplier;
35 import org.openecomp.cl.api.Logger;
36 import org.openecomp.cl.eelf.LoggerFactory;
37 import org.openecomp.sparky.logging.AaiUiMsgs;
38 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
39 import org.openecomp.sparky.util.NodeUtils;
42 * The Class SyncController.
46 public class SyncController {
47 private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncController.class);
50 * The Enum InternalState.
52 private enum InternalState {
53 IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
54 TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
58 * The Enum SyncActions.
60 public enum SyncActions {
61 SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
62 SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
65 private Collection<IndexSynchronizer> registeredSynchronizers;
66 private Collection<IndexValidator> registeredIndexValidators;
67 private Collection<IndexCleaner> registeredIndexCleaners;
68 private InternalState currentInternalState;
69 private ExecutorService syncControllerExecutor;
70 private ExecutorService statReporterExecutor;
71 private final String controllerName;
74 * Instantiates a new sync controller.
76 * @param name the name
77 * @throws Exception the exception
79 public SyncController(String name) throws Exception {
81 this.controllerName = name;
83 * Does LHS result in a non-duplicated object collection?? What happens if you double-add an
87 registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
88 registeredIndexValidators = new LinkedHashSet<IndexValidator>();
89 registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
91 this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController", 5, LOG);
92 this.statReporterExecutor = NodeUtils.createNamedExecutor("StatReporter", 1, LOG);
94 this.currentInternalState = InternalState.IDLE;
98 * Change internal state.
100 * @param newState the new state
101 * @param causedByAction the caused by action
103 private void changeInternalState(InternalState newState, SyncActions causedByAction) {
104 LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
105 currentInternalState.toString(), newState.toString(), causedByAction.toString());
107 this.currentInternalState = newState;
109 performStateAction();
112 public String getControllerName() {
113 return controllerName;
119 * @param requestedAction the requested action
121 public void performAction(SyncActions requestedAction) {
123 if (currentInternalState == InternalState.IDLE) {
126 switch (requestedAction) {
128 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
135 } catch (Exception exc) {
136 String message = "An error occurred while performing action = " + requestedAction
137 + ". Error = " + exc.getMessage();
138 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
141 LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
146 * Perform state action.
148 private void performStateAction() {
151 switch (currentInternalState) {
153 case TEST_INDEX_INTEGRITY:
154 performIndexIntegrityValidation();
158 performPreSyncCleanupCollection();
162 performSynchronization();
166 performIndexSyncPostCollection();
167 changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
170 case SELECTIVE_DELETE:
171 performIndexCleanup();
172 changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
175 case GENERATE_FINAL_REPORT:
177 dumpStatReport(true);
179 changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
189 } catch (Exception exc) {
190 String message = "Caught an error which performing action. Error = " + exc.getMessage();
191 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
196 * Register entity synchronizer.
198 * @param entitySynchronizer the entity synchronizer
200 public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
202 String indexName = entitySynchronizer.getIndexName();
204 if (indexName != null) {
205 registeredSynchronizers.add(entitySynchronizer);
207 String message = "Failed to register entity synchronizer because index name is null";
208 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
214 * Register index validator.
216 * @param indexValidator the index validator
218 public void registerIndexValidator(IndexValidator indexValidator) {
220 String indexName = indexValidator.getIndexName();
222 if (indexName != null) {
223 registeredIndexValidators.add(indexValidator);
225 String message = "Failed to register index validator because index name is null";
226 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
232 * Register index cleaner.
234 * @param indexCleaner the index cleaner
236 public void registerIndexCleaner(IndexCleaner indexCleaner) {
238 String indexName = indexCleaner.getIndexName();
240 if (indexName != null) {
241 registeredIndexCleaners.add(indexCleaner);
243 String message = "Failed to register index cleaner because index name is null";
244 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
249 * State machine should drive our flow dosync just dispatches an action and the state machine
250 * determines what is in play and what is next
256 * @param showFinalReport the show final report
258 private void dumpStatReport(boolean showFinalReport) {
260 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
262 String statReport = synchronizer.getStatReport(showFinalReport);
264 if (statReport != null) {
265 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
273 private void clearCaches() {
276 * Any entity caches that were built as part of the sync operation should be cleared to save
277 * memory. The original intent of the caching was to provide a short-lived cache to satisfy
278 * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
281 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
282 synchronizer.clearCache();
287 * Perform pre sync cleanup collection.
289 private void performPreSyncCleanupCollection() {
292 * ask the index cleaners to collect the their pre-sync object id collections
295 for (IndexCleaner cleaner : registeredIndexCleaners) {
296 cleaner.populatePreOperationCollection();
299 changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
304 * Perform index sync post collection.
306 private void performIndexSyncPostCollection() {
309 * ask the entity purgers to collect the their pre-sync object id collections
312 for (IndexCleaner cleaner : registeredIndexCleaners) {
313 cleaner.populatePostOperationCollection();
319 * Perform index cleanup.
321 private void performIndexCleanup() {
324 * ask the entity purgers to collect the their pre-sync object id collections
327 for (IndexCleaner cleaner : registeredIndexCleaners) {
328 cleaner.performCleanup();
334 * Perform sync abort.
336 private void performSyncAbort() {
337 changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
341 * Perform index integrity validation.
343 private void performIndexIntegrityValidation() {
346 * loop through registered index validators and test and fix, if needed
349 for (IndexValidator validator : registeredIndexValidators) {
351 if (!validator.exists()) {
352 validator.createOrRepair();
354 } catch (Exception exc) {
355 String message = "Index validator caused an error = " + exc.getMessage();
356 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
360 changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
367 public void shutdown() {
369 this.syncControllerExecutor.shutdown();
370 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
373 synchronizer.shutdown();
374 } catch (Exception exc) {
375 LOG.error(AaiUiMsgs.ERROR_GENERIC,
376 "Synchronizer shutdown caused an error = " + exc.getMessage());
380 this.statReporterExecutor.shutdown();
384 * Need some kind of task running that responds to a transient boolean to kill it or we just stop
385 * the executor that it is in?
391 * Perform synchronization.
393 private void performSynchronization() {
396 * Get all the synchronizers running in parallel
399 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
400 supplyAsync(new Supplier<Void>() {
405 synchronizer.doSync();
409 }, this.syncControllerExecutor).whenComplete((result, error) -> {
412 * We don't bother checking the result, because it will always be null as the doSync() is
417 LOG.error(AaiUiMsgs.ERROR_GENERIC,
418 "doSync operation failed with an error = " + error.getMessage());
423 boolean allDone = false;
424 long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
430 int totalFinished = 0;
432 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
433 if (System.currentTimeMillis() > nextReportTimeStampInMs) {
435 nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
437 String statReport = synchronizer.getStatReport(false);
439 if (statReport != null) {
440 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
444 if (synchronizer.getState() == SynchronizerState.IDLE) {
449 allDone = (totalFinished == registeredSynchronizers.size());
453 } catch (InterruptedException exc) {
454 LOG.error(AaiUiMsgs.ERROR_GENERIC,
455 "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
460 changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
464 public SynchronizerState getState() {
466 switch (currentInternalState) {
469 return SynchronizerState.IDLE;
473 return SynchronizerState.PERFORMING_SYNCHRONIZATION;