2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.onap.aai.sparky.sync;
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
29 import java.util.Calendar;
30 import java.util.Collection;
31 import java.util.Date;
32 import java.util.LinkedHashSet;
33 import java.util.TimeZone;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Semaphore;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.sparky.logging.AaiUiMsgs;
42 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
43 import org.onap.aai.sparky.sync.enumeration.OperationState;
44 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
45 import org.onap.aai.sparky.util.NodeUtils;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
50 * The Class SyncController.
54 public class SyncControllerImpl implements SyncController {
55 private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
58 * The Enum InternalState.
60 private enum InternalState {
61 IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
62 TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
66 * The Enum SyncActions.
68 public enum SyncActions {
69 SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
70 SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
73 private Collection<IndexSynchronizer> registeredSynchronizers;
74 private Collection<IndexValidator> registeredIndexValidators;
75 private Collection<IndexCleaner> registeredIndexCleaners;
76 private InternalState currentInternalState;
77 private ExecutorService syncControllerExecutor;
78 private ExecutorService statReporterExecutor;
80 private long delayInMs;
81 private long syncFrequencyInMs;
82 private Date syncStartTime;
84 private Date lastExecutionDate;
85 private AtomicInteger runCount;
86 private Semaphore performingActionGate;
87 private Calendar creationTime;
89 private String syncStartTimeWithTimeZone;
90 private String controllerName;
92 protected SyncControllerConfig syncControllerConfig;
98 * Instantiates a new sync controller.
100 * @param name the name
101 * @throws Exception the exception
103 public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
104 this(syncControllerConfig,null);
107 public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
110 this.syncControllerConfig = syncControllerConfig;
113 this.syncFrequencyInMs = 86400000L;
114 this.syncStartTime = null;
115 this.lastExecutionDate = null;
116 this.runCount = new AtomicInteger(0);
117 this.performingActionGate = new Semaphore(1);
118 registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
119 registeredIndexValidators = new LinkedHashSet<IndexValidator>();
120 registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
122 String controllerName = syncControllerConfig.getControllerName();
124 if (targetEntityType != null) {
125 controllerName += " (" + targetEntityType + ")";
128 this.controllerName = controllerName;
130 this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
131 syncControllerConfig.getNumSyncControllerWorkers(), LOG);
132 this.statReporterExecutor =
133 NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
135 this.currentInternalState = InternalState.IDLE;
138 Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
148 * Change internal state.
150 * @param newState the new state
151 * @param causedByAction the caused by action
153 private void changeInternalState(InternalState newState, SyncActions causedByAction) {
154 LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
155 currentInternalState.toString(), newState.toString(), causedByAction.toString());
157 this.currentInternalState = newState;
159 performStateAction();
165 * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
168 public long getDelayInMs() {
173 * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
176 public void setDelayInMs(long delayInMs) {
177 this.delayInMs = delayInMs;
181 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
184 public long getSyncFrequencyInMs() {
185 return syncFrequencyInMs;
189 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
192 public void setSyncFrequencyInMs(long syncFrequencyInMs) {
193 this.syncFrequencyInMs = syncFrequencyInMs;
197 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
200 public Date getSyncStartTime() {
201 return syncStartTime;
205 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
208 public void setSyncStartTime(Date syncStartTime) {
209 this.syncStartTime = syncStartTime;
213 * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
216 public Date getLastExecutionDate() {
217 return lastExecutionDate;
221 * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
224 public void setLastExecutionDate(Date lastExecutionDate) {
225 this.lastExecutionDate = lastExecutionDate;
229 public String getControllerName() {
230 return controllerName;
237 public OperationState performAction(SyncActions requestedAction) {
239 if (currentInternalState == InternalState.IDLE) {
244 * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
248 switch (requestedAction) {
251 if (performingActionGate.tryAcquire()) {
254 long opStartTime = System.currentTimeMillis();
256 LOG.info(AaiUiMsgs.INFO_GENERIC,
257 getControllerName() + " started synchronization at "
258 + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
259 SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
261 runCount.incrementAndGet();
263 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
265 long opEndTime = System.currentTimeMillis();
267 long opTime = (opEndTime - opStartTime);
269 String durationMessage =
270 String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
272 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
274 if (syncControllerConfig.isPeriodicSyncEnabled()) {
276 LOG.info(AaiUiMsgs.INFO_GENERIC,
277 getControllerName() + " next sync to begin at " + getNextSyncTime());
279 TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
281 if (opTime > this.getSyncFrequencyInMs()) {
283 String durationWasLongerMessage = String.format(
284 getControllerName() + " synchronization took '%d' ms which is larger than"
285 + " synchronization interval of '%d' ms.",
286 opTime, this.getSyncFrequencyInMs());
288 LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
292 } catch (Exception syncException) {
293 String message = "An error occurred while performing action = " + requestedAction
294 + ". Error = " + syncException.getMessage();
295 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
297 performingActionGate.release();
300 return OperationState.IGNORED_SYNC_NOT_IDLE;
309 return OperationState.OK;
311 } catch (Exception exc) {
312 String message = "An error occurred while performing action = " + requestedAction
313 + ". Error = " + exc.getMessage();
314 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
315 return OperationState.ERROR;
320 LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
321 return OperationState.IGNORED_SYNC_NOT_IDLE;
326 * Perform state action.
328 private void performStateAction() {
331 switch (currentInternalState) {
333 case TEST_INDEX_INTEGRITY:
334 performIndexIntegrityValidation();
338 performPreSyncCleanupCollection();
342 performSynchronization();
346 performIndexSyncPostCollection();
347 changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
350 case SELECTIVE_DELETE:
351 performIndexCleanup();
352 changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
355 case GENERATE_FINAL_REPORT:
357 dumpStatReport(true);
359 changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
369 } catch (Exception exc) {
371 * Perhaps we should abort the sync on an exception
373 String message = "Caught an error which performing action. Error = " + exc.getMessage();
374 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
379 public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
381 String indexName = entitySynchronizer.getIndexName();
383 if (indexName != null) {
384 registeredSynchronizers.add(entitySynchronizer);
386 String message = "Failed to register entity synchronizer because index name is null";
387 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
393 public void registerIndexValidator(IndexValidator indexValidator) {
395 String indexName = indexValidator.getIndexName();
397 if (indexName != null) {
398 registeredIndexValidators.add(indexValidator);
400 String message = "Failed to register index validator because index name is null";
401 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
407 public void registerIndexCleaner(IndexCleaner indexCleaner) {
409 String indexName = indexCleaner.getIndexName();
411 if (indexName != null) {
412 registeredIndexCleaners.add(indexCleaner);
414 String message = "Failed to register index cleaner because index name is null";
415 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
420 * State machine should drive our flow dosync just dispatches an action and the state machine
421 * determines what is in play and what is next
427 * @param showFinalReport the show final report
429 private void dumpStatReport(boolean showFinalReport) {
431 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
433 String statReport = synchronizer.getStatReport(showFinalReport);
435 if (statReport != null) {
436 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
444 private void clearCaches() {
447 * Any entity caches that were built as part of the sync operation should be cleared to save
448 * memory. The original intent of the caching was to provide a short-lived cache to satisfy
449 * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
452 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
453 synchronizer.clearCache();
458 * Perform pre sync cleanup collection.
460 private void performPreSyncCleanupCollection() {
463 * ask the index cleaners to collect the their pre-sync object id collections
466 for (IndexCleaner cleaner : registeredIndexCleaners) {
467 cleaner.populatePreOperationCollection();
470 changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
475 * Perform index sync post collection.
477 private void performIndexSyncPostCollection() {
480 * ask the entity purgers to collect the their pre-sync object id collections
483 for (IndexCleaner cleaner : registeredIndexCleaners) {
484 cleaner.populatePostOperationCollection();
490 * Perform index cleanup.
492 private void performIndexCleanup() {
495 * ask the entity purgers to collect the their pre-sync object id collections
498 for (IndexCleaner cleaner : registeredIndexCleaners) {
499 cleaner.performCleanup();
505 * Perform sync abort.
507 private void performSyncAbort() {
508 changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
512 * Perform index integrity validation.
514 private void performIndexIntegrityValidation() {
517 * loop through registered index validators and test and fix, if needed
520 for (IndexValidator validator : registeredIndexValidators) {
522 if (!validator.exists()) {
523 validator.createOrRepair();
525 } catch (Exception exc) {
526 String message = "Index validator caused an error = " + exc.getMessage();
527 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
531 changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
536 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
539 public void shutdown() {
541 this.syncControllerExecutor.shutdown();
542 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
545 synchronizer.shutdown();
546 } catch (Exception exc) {
547 LOG.error(AaiUiMsgs.ERROR_GENERIC,
548 "Synchronizer shutdown caused an error = " + exc.getMessage());
552 this.statReporterExecutor.shutdown();
556 * Need some kind of task running that responds to a transient boolean to kill it or we just stop
557 * the executor that it is in?
563 * Perform synchronization.
565 private void performSynchronization() {
568 * Get all the synchronizers running in parallel
571 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
572 supplyAsync(new Supplier<Void>() {
577 synchronizer.doSync();
581 }, this.syncControllerExecutor).whenComplete((result, error) -> {
584 * We don't bother checking the result, because it will always be null as the doSync() is
589 LOG.error(AaiUiMsgs.ERROR_GENERIC,
590 "doSync operation failed with an error = " + error.getMessage());
595 boolean allDone = false;
596 long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
597 boolean dumpPeriodicStatReport = false;
600 int totalFinished = 0;
602 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
603 if (dumpPeriodicStatReport) {
604 if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
605 String statReport = synchronizer.getStatReport(false);
607 if (statReport != null) {
608 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
613 if (synchronizer.getState() == SynchronizerState.IDLE
614 || synchronizer.getState() == SynchronizerState.ABORTED) {
619 if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
620 dumpPeriodicStatReport = true;
621 nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
623 dumpPeriodicStatReport = false;
626 allDone = (totalFinished == registeredSynchronizers.size());
630 } catch (InterruptedException exc) {
631 LOG.error(AaiUiMsgs.ERROR_GENERIC,
632 "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
637 changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
642 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
645 public SynchronizerState getState() {
647 switch (currentInternalState) {
650 return SynchronizerState.IDLE;
654 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
662 public Calendar getCreationTime() {
667 public String getNextSyncTime() {
668 // TODO Auto-generated method stub
673 public boolean isPeriodicSyncEnabled() {
674 return syncControllerConfig.isPeriodicSyncEnabled();
678 public boolean isRunOnceSyncEnabled() {
679 return syncControllerConfig.isRunOnceSyncEnabled();