2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.sync;
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
27 import java.util.Calendar;
28 import java.util.Collection;
29 import java.util.Date;
30 import java.util.LinkedHashSet;
31 import java.util.TimeZone;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Semaphore;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Supplier;
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.sparky.logging.AaiUiMsgs;
40 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
41 import org.onap.aai.sparky.sync.enumeration.OperationState;
42 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
43 import org.onap.aai.sparky.util.NodeUtils;
46 * The Class SyncController.
50 public class SyncControllerImpl implements SyncController {
51 private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
54 * The Enum InternalState.
56 private enum InternalState {
57 IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC, TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
61 * The Enum SyncActions.
63 public enum SyncActions {
64 SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE, SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
67 private Collection<IndexSynchronizer> registeredSynchronizers;
68 private Collection<IndexValidator> registeredIndexValidators;
69 private Collection<IndexCleaner> registeredIndexCleaners;
70 private InternalState currentInternalState;
71 private ExecutorService syncControllerExecutor;
72 private ExecutorService statReporterExecutor;
74 private long delayInMs;
75 private long syncFrequencyInMs;
76 private Date syncStartTime;
78 private Date lastExecutionDate;
79 private AtomicInteger runCount;
80 private Semaphore performingActionGate;
81 private Calendar creationTime;
83 private String syncStartTimeWithTimeZone;
84 private String controllerName;
86 protected SyncControllerConfig syncControllerConfig;
91 * Instantiates a new sync controller.
93 * @param name the name
94 * @throws Exception the exception
96 public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
97 this(syncControllerConfig, null);
100 public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
103 this.syncControllerConfig = syncControllerConfig;
106 this.syncFrequencyInMs = 86400000L;
107 this.syncStartTime = null;
108 this.lastExecutionDate = null;
109 this.runCount = new AtomicInteger(0);
110 this.performingActionGate = new Semaphore(1);
111 registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
112 registeredIndexValidators = new LinkedHashSet<IndexValidator>();
113 registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
115 String controllerName = syncControllerConfig.getControllerName();
117 if (targetEntityType != null) {
118 controllerName += " (" + targetEntityType + ")";
121 this.controllerName = controllerName;
123 this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
124 syncControllerConfig.getNumSyncControllerWorkers(), LOG);
125 this.statReporterExecutor =
126 NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
128 this.currentInternalState = InternalState.IDLE;
130 this.creationTime = Calendar
131 .getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
138 * Change internal state.
140 * @param newState the new state
141 * @param causedByAction the caused by action
143 private void changeInternalState(InternalState newState, SyncActions causedByAction) {
144 LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName, currentInternalState.toString(),
145 newState.toString(), causedByAction.toString());
147 this.currentInternalState = newState;
149 performStateAction();
157 * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
160 public long getDelayInMs() {
167 * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
170 public void setDelayInMs(long delayInMs) {
171 this.delayInMs = delayInMs;
177 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
180 public long getSyncFrequencyInMs() {
181 return syncFrequencyInMs;
187 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
190 public void setSyncFrequencyInMs(long syncFrequencyInMs) {
191 this.syncFrequencyInMs = syncFrequencyInMs;
197 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
200 public Date getSyncStartTime() {
201 return syncStartTime;
207 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
210 public void setSyncStartTime(Date syncStartTime) {
211 this.syncStartTime = syncStartTime;
217 * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
220 public Date getLastExecutionDate() {
221 return lastExecutionDate;
227 * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
230 public void setLastExecutionDate(Date lastExecutionDate) {
231 this.lastExecutionDate = lastExecutionDate;
235 public String getControllerName() {
236 return controllerName;
242 public OperationState performAction(SyncActions requestedAction) {
244 if (currentInternalState == InternalState.IDLE) {
249 * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
253 switch (requestedAction) {
256 if (performingActionGate.tryAcquire()) {
259 long opStartTime = System.currentTimeMillis();
261 LOG.info(AaiUiMsgs.INFO_GENERIC,
262 getControllerName() + " started synchronization at "
263 + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
264 SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
266 runCount.incrementAndGet();
268 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
270 long opEndTime = System.currentTimeMillis();
272 long opTime = (opEndTime - opStartTime);
274 String durationMessage =
275 String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
277 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
279 if (syncControllerConfig.isPeriodicSyncEnabled()) {
281 LOG.info(AaiUiMsgs.INFO_GENERIC,
282 getControllerName() + " next sync to begin at " + getNextSyncTime());
285 TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
287 if (opTime > this.getSyncFrequencyInMs()) {
289 String durationWasLongerMessage = String.format(
290 getControllerName() + " synchronization took '%d' ms which is larger than"
291 + " synchronization interval of '%d' ms.",
292 opTime, this.getSyncFrequencyInMs());
294 LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
298 } catch (Exception syncException) {
299 String message = "An error occurred while performing action = " + requestedAction
300 + ". Error = " + syncException.getMessage();
301 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
303 performingActionGate.release();
306 return OperationState.IGNORED_SYNC_NOT_IDLE;
315 return OperationState.OK;
317 } catch (Exception exc) {
318 String message = "An error occurred while performing action = " + requestedAction
319 + ". Error = " + exc.getMessage();
320 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
321 return OperationState.ERROR;
326 LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
327 return OperationState.IGNORED_SYNC_NOT_IDLE;
332 * Perform state action.
334 private void performStateAction() {
337 switch (currentInternalState) {
339 case TEST_INDEX_INTEGRITY:
340 performIndexIntegrityValidation();
344 performPreSyncCleanupCollection();
348 performSynchronization();
352 performIndexSyncPostCollection();
353 changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
356 case SELECTIVE_DELETE:
357 performIndexCleanup();
358 changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
361 case GENERATE_FINAL_REPORT:
363 dumpStatReport(true);
365 changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
375 } catch (Exception exc) {
377 * Perhaps we should abort the sync on an exception
379 String message = "Caught an error which performing action. Error = " + exc.getMessage();
380 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
385 public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
387 String indexName = entitySynchronizer.getIndexName();
389 if (indexName != null) {
390 registeredSynchronizers.add(entitySynchronizer);
392 String message = "Failed to register entity synchronizer because index name is null";
393 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
399 public void registerIndexValidator(IndexValidator indexValidator) {
401 String indexName = indexValidator.getIndexName();
403 if (indexName != null) {
404 registeredIndexValidators.add(indexValidator);
406 String message = "Failed to register index validator because index name is null";
407 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
413 public void registerIndexCleaner(IndexCleaner indexCleaner) {
415 String indexName = indexCleaner.getIndexName();
417 if (indexName != null) {
418 registeredIndexCleaners.add(indexCleaner);
420 String message = "Failed to register index cleaner because index name is null";
421 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
426 * State machine should drive our flow dosync just dispatches an action and the state machine
427 * determines what is in play and what is next
433 * @param showFinalReport the show final report
435 private void dumpStatReport(boolean showFinalReport) {
437 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
439 String statReport = synchronizer.getStatReport(showFinalReport);
441 if (statReport != null) {
442 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
450 private void clearCaches() {
453 * Any entity caches that were built as part of the sync operation should be cleared to save
454 * memory. The original intent of the caching was to provide a short-lived cache to satisfy
455 * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
458 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
459 synchronizer.clearCache();
464 * Perform pre sync cleanup collection.
466 private void performPreSyncCleanupCollection() {
469 * ask the index cleaners to collect the their pre-sync object id collections
472 for (IndexCleaner cleaner : registeredIndexCleaners) {
473 cleaner.populatePreOperationCollection();
476 changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
481 * Perform index sync post collection.
483 private void performIndexSyncPostCollection() {
486 * ask the entity purgers to collect the their pre-sync object id collections
489 for (IndexCleaner cleaner : registeredIndexCleaners) {
490 cleaner.populatePostOperationCollection();
496 * Perform index cleanup.
498 private void performIndexCleanup() {
501 * ask the entity purgers to collect the their pre-sync object id collections
504 for (IndexCleaner cleaner : registeredIndexCleaners) {
505 cleaner.performCleanup();
511 * Perform sync abort.
513 private void performSyncAbort() {
514 changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
518 * Perform index integrity validation.
520 private void performIndexIntegrityValidation() {
523 * loop through registered index validators and test and fix, if needed
526 for (IndexValidator validator : registeredIndexValidators) {
528 if (!validator.exists()) {
529 validator.createOrRepair();
531 } catch (Exception exc) {
532 String message = "Index validator caused an error = " + exc.getMessage();
533 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
537 changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
544 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
547 public void shutdown() {
549 this.syncControllerExecutor.shutdown();
550 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
553 synchronizer.shutdown();
554 } catch (Exception exc) {
555 LOG.error(AaiUiMsgs.ERROR_GENERIC,
556 "Synchronizer shutdown caused an error = " + exc.getMessage());
560 this.statReporterExecutor.shutdown();
564 * Need some kind of task running that responds to a transient boolean to kill it or we just stop
565 * the executor that it is in?
571 * Perform synchronization.
573 private void performSynchronization() {
576 * Get all the synchronizers running in parallel
579 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
580 supplyAsync(new Supplier<Void>() {
585 synchronizer.doSync();
589 }, this.syncControllerExecutor).whenComplete((result, error) -> {
592 * We don't bother checking the result, because it will always be null as the doSync() is
597 LOG.error(AaiUiMsgs.ERROR_GENERIC,
598 "doSync operation failed with an error = " + error.getMessage());
603 boolean allDone = false;
604 long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
605 boolean dumpPeriodicStatReport = false;
608 int totalFinished = 0;
610 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
611 if (dumpPeriodicStatReport) {
612 if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
613 String statReport = synchronizer.getStatReport(false);
615 if (statReport != null) {
616 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
621 if (synchronizer.getState() == SynchronizerState.IDLE
622 || synchronizer.getState() == SynchronizerState.ABORTED) {
627 if (System.currentTimeMillis() > nextReportTimeStampInMs) {
628 dumpPeriodicStatReport = true;
629 nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
631 dumpPeriodicStatReport = false;
634 allDone = (totalFinished == registeredSynchronizers.size());
638 } catch (InterruptedException exc) {
639 LOG.error(AaiUiMsgs.ERROR_GENERIC,
640 "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
645 changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
652 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
655 public SynchronizerState getState() {
657 switch (currentInternalState) {
660 return SynchronizerState.IDLE;
664 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
672 public Calendar getCreationTime() {
677 public String getNextSyncTime() {
678 // TODO Auto-generated method stub
683 public boolean isPeriodicSyncEnabled() {
684 return syncControllerConfig.isPeriodicSyncEnabled();
688 public boolean isRunOnceSyncEnabled() {
689 return syncControllerConfig.isRunOnceSyncEnabled();