2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sparky.sync;
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
25 import java.util.Calendar;
26 import java.util.Collection;
27 import java.util.Date;
28 import java.util.LinkedHashSet;
29 import java.util.TimeZone;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.function.Supplier;
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.sparky.logging.AaiUiMsgs;
38 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
39 import org.onap.aai.sparky.sync.enumeration.OperationState;
40 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
41 import org.onap.aai.sparky.util.NodeUtils;
44 * The Class SyncController.
48 public class SyncControllerImpl implements SyncController {
49 private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
52 * The Enum InternalState.
54 private enum InternalState {
55 IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
56 TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
60 * The Enum SyncActions.
62 public enum SyncActions {
63 SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
64 SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
67 private Collection<IndexSynchronizer> registeredSynchronizers;
68 private Collection<IndexValidator> registeredIndexValidators;
69 private Collection<IndexCleaner> registeredIndexCleaners;
70 private InternalState currentInternalState;
71 private ExecutorService syncControllerExecutor;
72 private ExecutorService statReporterExecutor;
74 private long delayInMs;
75 private long syncFrequencyInMs;
76 private Date syncStartTime;
78 private Date lastExecutionDate;
79 private AtomicInteger runCount;
80 private Semaphore performingActionGate;
81 private Calendar creationTime;
83 private String syncStartTimeWithTimeZone;
84 private String controllerName;
86 protected SyncControllerConfig syncControllerConfig;
92 * Instantiates a new sync controller.
94 * @param name the name
95 * @throws Exception the exception
97 public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
98 this(syncControllerConfig,null);
101 public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
104 this.syncControllerConfig = syncControllerConfig;
107 this.syncFrequencyInMs = 86400000L;
108 this.syncStartTime = null;
109 this.lastExecutionDate = null;
110 this.runCount = new AtomicInteger(0);
111 this.performingActionGate = new Semaphore(1);
112 registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
113 registeredIndexValidators = new LinkedHashSet<IndexValidator>();
114 registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
116 String controllerName = syncControllerConfig.getControllerName();
118 if (targetEntityType != null) {
119 controllerName += " (" + targetEntityType + ")";
122 this.controllerName = controllerName;
124 this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
125 syncControllerConfig.getNumSyncControllerWorkers(), LOG);
126 this.statReporterExecutor =
127 NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
129 this.currentInternalState = InternalState.IDLE;
132 Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
142 * Change internal state.
144 * @param newState the new state
145 * @param causedByAction the caused by action
147 private void changeInternalState(InternalState newState, SyncActions causedByAction) {
148 LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
149 currentInternalState.toString(), newState.toString(), causedByAction.toString());
151 this.currentInternalState = newState;
153 performStateAction();
159 * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
162 public long getDelayInMs() {
167 * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
170 public void setDelayInMs(long delayInMs) {
171 this.delayInMs = delayInMs;
175 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
178 public long getSyncFrequencyInMs() {
179 return syncFrequencyInMs;
183 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
186 public void setSyncFrequencyInMs(long syncFrequencyInMs) {
187 this.syncFrequencyInMs = syncFrequencyInMs;
191 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
194 public Date getSyncStartTime() {
195 return syncStartTime;
199 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
202 public void setSyncStartTime(Date syncStartTime) {
203 this.syncStartTime = syncStartTime;
207 * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
210 public Date getLastExecutionDate() {
211 return lastExecutionDate;
215 * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
218 public void setLastExecutionDate(Date lastExecutionDate) {
219 this.lastExecutionDate = lastExecutionDate;
223 public String getControllerName() {
224 return controllerName;
231 public OperationState performAction(SyncActions requestedAction) {
233 if (currentInternalState == InternalState.IDLE) {
238 * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
242 switch (requestedAction) {
245 if (performingActionGate.tryAcquire()) {
248 long opStartTime = System.currentTimeMillis();
250 LOG.info(AaiUiMsgs.INFO_GENERIC,
251 getControllerName() + " started synchronization at "
252 + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
253 SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
255 runCount.incrementAndGet();
257 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
259 long opEndTime = System.currentTimeMillis();
261 long opTime = (opEndTime - opStartTime);
263 String durationMessage =
264 String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
266 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
268 if (syncControllerConfig.isPeriodicSyncEnabled()) {
270 LOG.info(AaiUiMsgs.INFO_GENERIC,
271 getControllerName() + " next sync to begin at " + getNextSyncTime());
273 TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
275 if (opTime > this.getSyncFrequencyInMs()) {
277 String durationWasLongerMessage = String.format(
278 getControllerName() + " synchronization took '%d' ms which is larger than"
279 + " synchronization interval of '%d' ms.",
280 opTime, this.getSyncFrequencyInMs());
282 LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
286 } catch (Exception syncException) {
287 String message = "An error occurred while performing action = " + requestedAction
288 + ". Error = " + syncException.getMessage();
289 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
291 performingActionGate.release();
294 return OperationState.IGNORED_SYNC_NOT_IDLE;
303 return OperationState.OK;
305 } catch (Exception exc) {
306 String message = "An error occurred while performing action = " + requestedAction
307 + ". Error = " + exc.getMessage();
308 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
309 return OperationState.ERROR;
314 LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
315 return OperationState.IGNORED_SYNC_NOT_IDLE;
320 * Perform state action.
322 private void performStateAction() {
325 switch (currentInternalState) {
327 case TEST_INDEX_INTEGRITY:
328 performIndexIntegrityValidation();
332 performPreSyncCleanupCollection();
336 performSynchronization();
340 performIndexSyncPostCollection();
341 changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
344 case SELECTIVE_DELETE:
345 performIndexCleanup();
346 changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
349 case GENERATE_FINAL_REPORT:
351 dumpStatReport(true);
353 changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
363 } catch (Exception exc) {
365 * Perhaps we should abort the sync on an exception
367 String message = "Caught an error which performing action. Error = " + exc.getMessage();
368 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
373 public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
375 String indexName = entitySynchronizer.getIndexName();
377 if (indexName != null) {
378 registeredSynchronizers.add(entitySynchronizer);
380 String message = "Failed to register entity synchronizer because index name is null";
381 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
387 public void registerIndexValidator(IndexValidator indexValidator) {
389 String indexName = indexValidator.getIndexName();
391 if (indexName != null) {
392 registeredIndexValidators.add(indexValidator);
394 String message = "Failed to register index validator because index name is null";
395 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
401 public void registerIndexCleaner(IndexCleaner indexCleaner) {
403 String indexName = indexCleaner.getIndexName();
405 if (indexName != null) {
406 registeredIndexCleaners.add(indexCleaner);
408 String message = "Failed to register index cleaner because index name is null";
409 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
414 * State machine should drive our flow dosync just dispatches an action and the state machine
415 * determines what is in play and what is next
421 * @param showFinalReport the show final report
423 private void dumpStatReport(boolean showFinalReport) {
425 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
427 String statReport = synchronizer.getStatReport(showFinalReport);
429 if (statReport != null) {
430 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
438 private void clearCaches() {
441 * Any entity caches that were built as part of the sync operation should be cleared to save
442 * memory. The original intent of the caching was to provide a short-lived cache to satisfy
443 * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
446 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
447 synchronizer.clearCache();
452 * Perform pre sync cleanup collection.
454 private void performPreSyncCleanupCollection() {
457 * ask the index cleaners to collect the their pre-sync object id collections
460 for (IndexCleaner cleaner : registeredIndexCleaners) {
461 cleaner.populatePreOperationCollection();
464 changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
469 * Perform index sync post collection.
471 private void performIndexSyncPostCollection() {
474 * ask the entity purgers to collect the their pre-sync object id collections
477 for (IndexCleaner cleaner : registeredIndexCleaners) {
478 cleaner.populatePostOperationCollection();
484 * Perform index cleanup.
486 private void performIndexCleanup() {
489 * ask the entity purgers to collect the their pre-sync object id collections
492 for (IndexCleaner cleaner : registeredIndexCleaners) {
493 cleaner.performCleanup();
499 * Perform sync abort.
501 private void performSyncAbort() {
502 changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
506 * Perform index integrity validation.
508 private void performIndexIntegrityValidation() {
511 * loop through registered index validators and test and fix, if needed
514 for (IndexValidator validator : registeredIndexValidators) {
516 if (!validator.exists()) {
517 validator.createOrRepair();
519 } catch (Exception exc) {
520 String message = "Index validator caused an error = " + exc.getMessage();
521 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
525 changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
530 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
533 public void shutdown() {
535 this.syncControllerExecutor.shutdown();
536 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
539 synchronizer.shutdown();
540 } catch (Exception exc) {
541 LOG.error(AaiUiMsgs.ERROR_GENERIC,
542 "Synchronizer shutdown caused an error = " + exc.getMessage());
546 this.statReporterExecutor.shutdown();
550 * Need some kind of task running that responds to a transient boolean to kill it or we just stop
551 * the executor that it is in?
557 * Perform synchronization.
559 private void performSynchronization() {
562 * Get all the synchronizers running in parallel
565 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
566 supplyAsync(new Supplier<Void>() {
571 synchronizer.doSync();
575 }, this.syncControllerExecutor).whenComplete((result, error) -> {
578 * We don't bother checking the result, because it will always be null as the doSync() is
583 LOG.error(AaiUiMsgs.ERROR_GENERIC,
584 "doSync operation failed with an error = " + error.getMessage());
589 boolean allDone = false;
590 long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
591 boolean dumpPeriodicStatReport = false;
594 int totalFinished = 0;
596 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
597 if (dumpPeriodicStatReport) {
598 if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
599 String statReport = synchronizer.getStatReport(false);
601 if (statReport != null) {
602 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
607 if (synchronizer.getState() == SynchronizerState.IDLE
608 || synchronizer.getState() == SynchronizerState.ABORTED) {
613 if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
614 dumpPeriodicStatReport = true;
615 nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
617 dumpPeriodicStatReport = false;
620 allDone = (totalFinished == registeredSynchronizers.size());
624 } catch (InterruptedException exc) {
625 LOG.error(AaiUiMsgs.ERROR_GENERIC,
626 "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
631 changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
636 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
639 public SynchronizerState getState() {
641 switch (currentInternalState) {
644 return SynchronizerState.IDLE;
648 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
656 public Calendar getCreationTime() {
661 public String getNextSyncTime() {
662 // TODO Auto-generated method stub
667 public boolean isPeriodicSyncEnabled() {
668 return syncControllerConfig.isPeriodicSyncEnabled();
672 public boolean isRunOnceSyncEnabled() {
673 return syncControllerConfig.isRunOnceSyncEnabled();