2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.openecomp.sparky.synchronizer;
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
27 import java.util.Collection;
28 import java.util.LinkedHashSet;
29 import java.util.concurrent.ExecutorService;
30 import java.util.function.Supplier;
32 import org.openecomp.cl.api.Logger;
33 import org.openecomp.cl.eelf.LoggerFactory;
34 import org.openecomp.sparky.logging.AaiUiMsgs;
35 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
36 import org.openecomp.sparky.util.NodeUtils;
39 * The Class SyncController.
43 public class SyncController {
44 private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncController.class);
47 * The Enum InternalState.
49 private enum InternalState {
50 IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
51 TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
55 * The Enum SyncActions.
57 public enum SyncActions {
58 SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
59 SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
62 private Collection<IndexSynchronizer> registeredSynchronizers;
63 private Collection<IndexValidator> registeredIndexValidators;
64 private Collection<IndexCleaner> registeredIndexCleaners;
65 private InternalState currentInternalState;
66 private ExecutorService syncControllerExecutor;
67 private ExecutorService statReporterExecutor;
68 private final String controllerName;
71 * Instantiates a new sync controller.
73 * @param name the name
74 * @throws Exception the exception
76 public SyncController(String name) throws Exception {
78 this.controllerName = name;
80 * Does LHS result in a non-duplicated object collection?? What happens if you double-add an
84 registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
85 registeredIndexValidators = new LinkedHashSet<IndexValidator>();
86 registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
88 this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController", 5, LOG);
89 this.statReporterExecutor = NodeUtils.createNamedExecutor("StatReporter", 1, LOG);
91 this.currentInternalState = InternalState.IDLE;
95 * Change internal state.
97 * @param newState the new state
98 * @param causedByAction the caused by action
100 private void changeInternalState(InternalState newState, SyncActions causedByAction) {
101 LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
102 currentInternalState.toString(), newState.toString(), causedByAction.toString());
104 this.currentInternalState = newState;
106 performStateAction();
109 public String getControllerName() {
110 return controllerName;
116 * @param requestedAction the requested action
118 public void performAction(SyncActions requestedAction) {
120 if (currentInternalState == InternalState.IDLE) {
123 switch (requestedAction) {
125 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
132 } catch (Exception exc) {
133 String message = "An error occurred while performing action = " + requestedAction
134 + ". Error = " + exc.getMessage();
135 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
138 LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
143 * Perform state action.
145 private void performStateAction() {
148 switch (currentInternalState) {
150 case TEST_INDEX_INTEGRITY:
151 performIndexIntegrityValidation();
155 performPreSyncCleanupCollection();
159 performSynchronization();
163 performIndexSyncPostCollection();
164 changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
167 case SELECTIVE_DELETE:
168 performIndexCleanup();
169 changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
172 case GENERATE_FINAL_REPORT:
174 dumpStatReport(true);
176 changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
186 } catch (Exception exc) {
187 String message = "Caught an error which performing action. Error = " + exc.getMessage();
188 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
193 * Register entity synchronizer.
195 * @param entitySynchronizer the entity synchronizer
197 public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
199 String indexName = entitySynchronizer.getIndexName();
201 if (indexName != null) {
202 registeredSynchronizers.add(entitySynchronizer);
204 String message = "Failed to register entity synchronizer because index name is null";
205 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
211 * Register index validator.
213 * @param indexValidator the index validator
215 public void registerIndexValidator(IndexValidator indexValidator) {
217 String indexName = indexValidator.getIndexName();
219 if (indexName != null) {
220 registeredIndexValidators.add(indexValidator);
222 String message = "Failed to register index validator because index name is null";
223 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
229 * Register index cleaner.
231 * @param indexCleaner the index cleaner
233 public void registerIndexCleaner(IndexCleaner indexCleaner) {
235 String indexName = indexCleaner.getIndexName();
237 if (indexName != null) {
238 registeredIndexCleaners.add(indexCleaner);
240 String message = "Failed to register index cleaner because index name is null";
241 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
246 * State machine should drive our flow dosync just dispatches an action and the state machine
247 * determines what is in play and what is next
253 * @param showFinalReport the show final report
255 private void dumpStatReport(boolean showFinalReport) {
257 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
259 String statReport = synchronizer.getStatReport(showFinalReport);
261 if (statReport != null) {
262 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
270 private void clearCaches() {
273 * Any entity caches that were built as part of the sync operation should be cleared to save
274 * memory. The original intent of the caching was to provide a short-lived cache to satisfy
275 * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
278 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
279 synchronizer.clearCache();
284 * Perform pre sync cleanup collection.
286 private void performPreSyncCleanupCollection() {
289 * ask the index cleaners to collect the their pre-sync object id collections
292 for (IndexCleaner cleaner : registeredIndexCleaners) {
293 cleaner.populatePreOperationCollection();
296 changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
301 * Perform index sync post collection.
303 private void performIndexSyncPostCollection() {
306 * ask the entity purgers to collect the their pre-sync object id collections
309 for (IndexCleaner cleaner : registeredIndexCleaners) {
310 cleaner.populatePostOperationCollection();
316 * Perform index cleanup.
318 private void performIndexCleanup() {
321 * ask the entity purgers to collect the their pre-sync object id collections
324 for (IndexCleaner cleaner : registeredIndexCleaners) {
325 cleaner.performCleanup();
331 * Perform sync abort.
333 private void performSyncAbort() {
334 changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
338 * Perform index integrity validation.
340 private void performIndexIntegrityValidation() {
343 * loop through registered index validators and test and fix, if needed
346 for (IndexValidator validator : registeredIndexValidators) {
348 if (!validator.exists()) {
349 validator.createOrRepair();
351 } catch (Exception exc) {
352 String message = "Index validator caused an error = " + exc.getMessage();
353 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
357 changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
364 public void shutdown() {
366 this.syncControllerExecutor.shutdown();
367 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
370 synchronizer.shutdown();
371 } catch (Exception exc) {
372 LOG.error(AaiUiMsgs.ERROR_GENERIC,
373 "Synchronizer shutdown caused an error = " + exc.getMessage());
377 this.statReporterExecutor.shutdown();
381 * Need some kind of task running that responds to a transient boolean to kill it or we just stop
382 * the executor that it is in?
388 * Perform synchronization.
390 private void performSynchronization() {
393 * Get all the synchronizers running in parallel
396 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
397 supplyAsync(new Supplier<Void>() {
402 synchronizer.doSync();
406 }, this.syncControllerExecutor).whenComplete((result, error) -> {
409 * We don't bother checking the result, because it will always be null as the doSync() is
414 LOG.error(AaiUiMsgs.ERROR_GENERIC,
415 "doSync operation failed with an error = " + error.getMessage());
420 boolean allDone = false;
421 long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
422 boolean dumpPeriodicStatReport = false;
426 int totalFinished = 0;
428 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
429 if (dumpPeriodicStatReport) {
430 if (synchronizer.getState() != SynchronizerState.IDLE) {
431 String statReport = synchronizer.getStatReport(false);
432 if (statReport != null) {
433 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
436 if (synchronizer.getState() == SynchronizerState.IDLE) {
441 if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
442 dumpPeriodicStatReport = true;
443 nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
445 dumpPeriodicStatReport = false;
447 allDone = (totalFinished == registeredSynchronizers.size());
451 } catch (InterruptedException exc) {
452 LOG.error(AaiUiMsgs.ERROR_GENERIC,
453 "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
458 changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
462 public SynchronizerState getState() {
464 switch (currentInternalState) {
467 return SynchronizerState.IDLE;
471 return SynchronizerState.PERFORMING_SYNCHRONIZATION;