2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sparky.sync;
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
25 import java.util.Calendar;
26 import java.util.Collection;
27 import java.util.Date;
28 import java.util.LinkedHashSet;
29 import java.util.TimeZone;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.function.Supplier;
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.sparky.logging.AaiUiMsgs;
38 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
39 import org.onap.aai.sparky.sync.enumeration.OperationState;
40 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
41 import org.onap.aai.sparky.util.NodeUtils;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.stereotype.Component;
46 * The Class SyncController.
50 public class SyncControllerImpl implements SyncController {
51 private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
54 * The Enum InternalState.
56 private enum InternalState {
57 IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
58 TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
62 * The Enum SyncActions.
64 public enum SyncActions {
65 SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
66 SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
69 private Collection<IndexSynchronizer> registeredSynchronizers;
70 private Collection<IndexValidator> registeredIndexValidators;
71 private Collection<IndexCleaner> registeredIndexCleaners;
72 private InternalState currentInternalState;
73 private ExecutorService syncControllerExecutor;
74 private ExecutorService statReporterExecutor;
76 private long delayInMs;
77 private long syncFrequencyInMs;
78 private Date syncStartTime;
80 private Date lastExecutionDate;
81 private AtomicInteger runCount;
82 private Semaphore performingActionGate;
83 private Calendar creationTime;
85 private String syncStartTimeWithTimeZone;
86 private String controllerName;
88 protected SyncControllerConfig syncControllerConfig;
94 * Instantiates a new sync controller.
96 * @param name the name
97 * @throws Exception the exception
99 public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
100 this(syncControllerConfig,null);
103 public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
106 this.syncControllerConfig = syncControllerConfig;
109 this.syncFrequencyInMs = 86400000L;
110 this.syncStartTime = null;
111 this.lastExecutionDate = null;
112 this.runCount = new AtomicInteger(0);
113 this.performingActionGate = new Semaphore(1);
114 registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
115 registeredIndexValidators = new LinkedHashSet<IndexValidator>();
116 registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
118 String controllerName = syncControllerConfig.getControllerName();
120 if (targetEntityType != null) {
121 controllerName += " (" + targetEntityType + ")";
124 this.controllerName = controllerName;
126 this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
127 syncControllerConfig.getNumSyncControllerWorkers(), LOG);
128 this.statReporterExecutor =
129 NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
131 this.currentInternalState = InternalState.IDLE;
134 Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
144 * Change internal state.
146 * @param newState the new state
147 * @param causedByAction the caused by action
149 private void changeInternalState(InternalState newState, SyncActions causedByAction) {
150 LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
151 currentInternalState.toString(), newState.toString(), causedByAction.toString());
153 this.currentInternalState = newState;
155 performStateAction();
161 * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
164 public long getDelayInMs() {
169 * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
172 public void setDelayInMs(long delayInMs) {
173 this.delayInMs = delayInMs;
177 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
180 public long getSyncFrequencyInMs() {
181 return syncFrequencyInMs;
185 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
188 public void setSyncFrequencyInMs(long syncFrequencyInMs) {
189 this.syncFrequencyInMs = syncFrequencyInMs;
193 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
196 public Date getSyncStartTime() {
197 return syncStartTime;
201 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
204 public void setSyncStartTime(Date syncStartTime) {
205 this.syncStartTime = syncStartTime;
209 * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
212 public Date getLastExecutionDate() {
213 return lastExecutionDate;
217 * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
220 public void setLastExecutionDate(Date lastExecutionDate) {
221 this.lastExecutionDate = lastExecutionDate;
225 public String getControllerName() {
226 return controllerName;
233 public OperationState performAction(SyncActions requestedAction) {
235 if (currentInternalState == InternalState.IDLE) {
240 * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
244 switch (requestedAction) {
247 if (performingActionGate.tryAcquire()) {
250 long opStartTime = System.currentTimeMillis();
252 LOG.info(AaiUiMsgs.INFO_GENERIC,
253 getControllerName() + " started synchronization at "
254 + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
255 SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
257 runCount.incrementAndGet();
259 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
261 long opEndTime = System.currentTimeMillis();
263 long opTime = (opEndTime - opStartTime);
265 String durationMessage =
266 String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
268 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
270 if (syncControllerConfig.isPeriodicSyncEnabled()) {
272 LOG.info(AaiUiMsgs.INFO_GENERIC,
273 getControllerName() + " next sync to begin at " + getNextSyncTime());
275 TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
277 if (opTime > this.getSyncFrequencyInMs()) {
279 String durationWasLongerMessage = String.format(
280 getControllerName() + " synchronization took '%d' ms which is larger than"
281 + " synchronization interval of '%d' ms.",
282 opTime, this.getSyncFrequencyInMs());
284 LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
288 } catch (Exception syncException) {
289 String message = "An error occurred while performing action = " + requestedAction
290 + ". Error = " + syncException.getMessage();
291 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
293 performingActionGate.release();
296 return OperationState.IGNORED_SYNC_NOT_IDLE;
305 return OperationState.OK;
307 } catch (Exception exc) {
308 String message = "An error occurred while performing action = " + requestedAction
309 + ". Error = " + exc.getMessage();
310 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
311 return OperationState.ERROR;
316 LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
317 return OperationState.IGNORED_SYNC_NOT_IDLE;
322 * Perform state action.
324 private void performStateAction() {
327 switch (currentInternalState) {
329 case TEST_INDEX_INTEGRITY:
330 performIndexIntegrityValidation();
334 performPreSyncCleanupCollection();
338 performSynchronization();
342 performIndexSyncPostCollection();
343 changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
346 case SELECTIVE_DELETE:
347 performIndexCleanup();
348 changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
351 case GENERATE_FINAL_REPORT:
353 dumpStatReport(true);
355 changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
365 } catch (Exception exc) {
367 * Perhaps we should abort the sync on an exception
369 String message = "Caught an error which performing action. Error = " + exc.getMessage();
370 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
375 public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
377 String indexName = entitySynchronizer.getIndexName();
379 if (indexName != null) {
380 registeredSynchronizers.add(entitySynchronizer);
382 String message = "Failed to register entity synchronizer because index name is null";
383 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
389 public void registerIndexValidator(IndexValidator indexValidator) {
391 String indexName = indexValidator.getIndexName();
393 if (indexName != null) {
394 registeredIndexValidators.add(indexValidator);
396 String message = "Failed to register index validator because index name is null";
397 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
403 public void registerIndexCleaner(IndexCleaner indexCleaner) {
405 String indexName = indexCleaner.getIndexName();
407 if (indexName != null) {
408 registeredIndexCleaners.add(indexCleaner);
410 String message = "Failed to register index cleaner because index name is null";
411 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
416 * State machine should drive our flow dosync just dispatches an action and the state machine
417 * determines what is in play and what is next
423 * @param showFinalReport the show final report
425 private void dumpStatReport(boolean showFinalReport) {
427 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
429 String statReport = synchronizer.getStatReport(showFinalReport);
431 if (statReport != null) {
432 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
440 private void clearCaches() {
443 * Any entity caches that were built as part of the sync operation should be cleared to save
444 * memory. The original intent of the caching was to provide a short-lived cache to satisfy
445 * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
448 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
449 synchronizer.clearCache();
454 * Perform pre sync cleanup collection.
456 private void performPreSyncCleanupCollection() {
459 * ask the index cleaners to collect the their pre-sync object id collections
462 for (IndexCleaner cleaner : registeredIndexCleaners) {
463 cleaner.populatePreOperationCollection();
466 changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
471 * Perform index sync post collection.
473 private void performIndexSyncPostCollection() {
476 * ask the entity purgers to collect the their pre-sync object id collections
479 for (IndexCleaner cleaner : registeredIndexCleaners) {
480 cleaner.populatePostOperationCollection();
486 * Perform index cleanup.
488 private void performIndexCleanup() {
491 * ask the entity purgers to collect the their pre-sync object id collections
494 for (IndexCleaner cleaner : registeredIndexCleaners) {
495 cleaner.performCleanup();
501 * Perform sync abort.
503 private void performSyncAbort() {
504 changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
508 * Perform index integrity validation.
510 private void performIndexIntegrityValidation() {
513 * loop through registered index validators and test and fix, if needed
516 for (IndexValidator validator : registeredIndexValidators) {
518 if (!validator.exists()) {
519 validator.createOrRepair();
521 } catch (Exception exc) {
522 String message = "Index validator caused an error = " + exc.getMessage();
523 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
527 changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
532 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
535 public void shutdown() {
537 this.syncControllerExecutor.shutdown();
538 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
541 synchronizer.shutdown();
542 } catch (Exception exc) {
543 LOG.error(AaiUiMsgs.ERROR_GENERIC,
544 "Synchronizer shutdown caused an error = " + exc.getMessage());
548 this.statReporterExecutor.shutdown();
552 * Need some kind of task running that responds to a transient boolean to kill it or we just stop
553 * the executor that it is in?
559 * Perform synchronization.
561 private void performSynchronization() {
564 * Get all the synchronizers running in parallel
567 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
568 supplyAsync(new Supplier<Void>() {
573 synchronizer.doSync();
577 }, this.syncControllerExecutor).whenComplete((result, error) -> {
580 * We don't bother checking the result, because it will always be null as the doSync() is
585 LOG.error(AaiUiMsgs.ERROR_GENERIC,
586 "doSync operation failed with an error = " + error.getMessage());
591 boolean allDone = false;
592 long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
593 boolean dumpPeriodicStatReport = false;
596 int totalFinished = 0;
598 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
599 if (dumpPeriodicStatReport) {
600 if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
601 String statReport = synchronizer.getStatReport(false);
603 if (statReport != null) {
604 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
609 if (synchronizer.getState() == SynchronizerState.IDLE
610 || synchronizer.getState() == SynchronizerState.ABORTED) {
615 if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
616 dumpPeriodicStatReport = true;
617 nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
619 dumpPeriodicStatReport = false;
622 allDone = (totalFinished == registeredSynchronizers.size());
626 } catch (InterruptedException exc) {
627 LOG.error(AaiUiMsgs.ERROR_GENERIC,
628 "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
633 changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
638 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
641 public SynchronizerState getState() {
643 switch (currentInternalState) {
646 return SynchronizerState.IDLE;
650 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
658 public Calendar getCreationTime() {
663 public String getNextSyncTime() {
664 // TODO Auto-generated method stub
669 public boolean isPeriodicSyncEnabled() {
670 return syncControllerConfig.isPeriodicSyncEnabled();
674 public boolean isRunOnceSyncEnabled() {
675 return syncControllerConfig.isRunOnceSyncEnabled();