2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sparky.sync;
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
25 import java.text.SimpleDateFormat;
26 import java.util.Calendar;
27 import java.util.Collection;
28 import java.util.Date;
29 import java.util.LinkedHashSet;
30 import java.util.TimeZone;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Supplier;
36 import org.onap.aai.cl.api.Logger;
37 import org.onap.aai.cl.eelf.LoggerFactory;
38 import org.onap.aai.sparky.logging.AaiUiMsgs;
39 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
40 import org.onap.aai.sparky.sync.enumeration.OperationState;
41 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
42 import org.onap.aai.sparky.util.NodeUtils;
43 import org.springframework.beans.factory.annotation.Autowired;
44 import org.springframework.stereotype.Component;
47 * The Class SyncController.
51 public class SyncControllerImpl implements SyncController {
52 private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
55 * The Enum InternalState.
57 private enum InternalState {
58 IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
59 TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
62 private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
65 * The Enum SyncActions.
67 public enum SyncActions {
68 SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
69 SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
72 private Collection<IndexSynchronizer> registeredSynchronizers;
73 private Collection<IndexValidator> registeredIndexValidators;
74 private Collection<IndexCleaner> registeredIndexCleaners;
75 private InternalState currentInternalState;
76 private ExecutorService syncControllerExecutor;
77 private ExecutorService statReporterExecutor;
79 private long delayInMs;
80 private long syncFrequencyInMs;
81 private Date syncStartTime;
83 private Date lastExecutionDate;
84 private AtomicInteger runCount;
85 private Semaphore performingActionGate;
86 private Calendar creationTime;
88 private String syncStartTimeWithTimeZone;
89 private String controllerName;
91 protected SyncControllerConfig syncControllerConfig;
97 * Instantiates a new sync controller.
99 * @throws Exception the exception
101 public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
102 this(syncControllerConfig,null);
105 public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
108 this.syncControllerConfig = syncControllerConfig;
111 this.syncFrequencyInMs = 86400000L;
112 this.syncStartTime = null;
113 this.lastExecutionDate = null;
114 this.runCount = new AtomicInteger(0);
115 this.performingActionGate = new Semaphore(1);
116 registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
117 registeredIndexValidators = new LinkedHashSet<IndexValidator>();
118 registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
120 String controllerName = syncControllerConfig.getControllerName();
122 if (targetEntityType != null) {
123 controllerName += " (" + targetEntityType + ")";
126 this.controllerName = controllerName;
128 this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
129 syncControllerConfig.getNumSyncControllerWorkers(), LOG);
130 this.statReporterExecutor =
131 NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
133 this.currentInternalState = InternalState.IDLE;
136 Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
146 * Change internal state.
148 * @param newState the new state
149 * @param causedByAction the caused by action
151 private void changeInternalState(InternalState newState, SyncActions causedByAction) {
152 LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
153 currentInternalState.toString(), newState.toString(), causedByAction.toString());
155 this.currentInternalState = newState;
157 performStateAction();
163 * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
166 public long getDelayInMs() {
171 * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
174 public void setDelayInMs(long delayInMs) {
175 this.delayInMs = delayInMs;
179 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
182 public long getSyncFrequencyInMs() {
183 return syncFrequencyInMs;
187 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
190 public void setSyncFrequencyInMs(long syncFrequencyInMs) {
191 this.syncFrequencyInMs = syncFrequencyInMs;
195 * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
198 public Date getSyncStartTime() {
199 return syncStartTime;
203 * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
206 public void setSyncStartTime(Date syncStartTime) {
207 this.syncStartTime = syncStartTime;
211 * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
214 public Date getLastExecutionDate() {
215 return lastExecutionDate;
219 * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
222 public void setLastExecutionDate(Date lastExecutionDate) {
223 this.lastExecutionDate = lastExecutionDate;
227 public String getControllerName() {
228 return controllerName;
235 public OperationState performAction(SyncActions requestedAction) {
237 if (currentInternalState == InternalState.IDLE) {
242 * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
246 switch (requestedAction) {
249 if (performingActionGate.tryAcquire()) {
252 long opStartTime = System.currentTimeMillis();
254 LOG.info(AaiUiMsgs.INFO_GENERIC,
255 getControllerName() + " started synchronization at "
256 + this.simpleDateFormat.format(opStartTime).replaceAll(
257 SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
259 runCount.incrementAndGet();
261 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
263 long opEndTime = System.currentTimeMillis();
265 long opTime = (opEndTime - opStartTime);
267 String durationMessage =
268 String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
270 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
272 if (syncControllerConfig.isPeriodicSyncEnabled()) {
274 LOG.info(AaiUiMsgs.INFO_GENERIC,
275 getControllerName() + " next sync to begin at " + getNextSyncTime());
277 TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
279 if (opTime > this.getSyncFrequencyInMs()) {
281 String durationWasLongerMessage = String.format(
282 getControllerName() + " synchronization took '%d' ms which is larger than"
283 + " synchronization interval of '%d' ms.",
284 opTime, this.getSyncFrequencyInMs());
286 LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
290 } catch (Exception syncException) {
291 String message = "An error occurred while performing action = " + requestedAction
292 + ". Error = " + syncException.getMessage();
293 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
295 performingActionGate.release();
298 return OperationState.IGNORED_SYNC_NOT_IDLE;
307 return OperationState.OK;
309 } catch (Exception exc) {
310 String message = "An error occurred while performing action = " + requestedAction
311 + ". Error = " + exc.getMessage();
312 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
313 return OperationState.ERROR;
318 LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
319 return OperationState.IGNORED_SYNC_NOT_IDLE;
324 * Perform state action.
326 private void performStateAction() {
329 switch (currentInternalState) {
331 case TEST_INDEX_INTEGRITY:
332 performIndexIntegrityValidation();
336 performPreSyncCleanupCollection();
340 performSynchronization();
344 performIndexSyncPostCollection();
345 changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
348 case SELECTIVE_DELETE:
349 performIndexCleanup();
350 changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
353 case GENERATE_FINAL_REPORT:
355 dumpStatReport(true);
357 changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
367 } catch (Exception exc) {
369 * Perhaps we should abort the sync on an exception
371 String message = "Caught an error which performing action. Error = " + exc.getMessage();
372 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
377 public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
379 String indexName = entitySynchronizer.getIndexName();
381 if (indexName != null) {
382 registeredSynchronizers.add(entitySynchronizer);
384 String message = "Failed to register entity synchronizer because index name is null";
385 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
391 public void registerIndexValidator(IndexValidator indexValidator) {
393 String indexName = indexValidator.getIndexName();
395 if (indexName != null) {
396 registeredIndexValidators.add(indexValidator);
398 String message = "Failed to register index validator because index name is null";
399 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
405 public void registerIndexCleaner(IndexCleaner indexCleaner) {
407 String indexName = indexCleaner.getIndexName();
409 if (indexName != null) {
410 registeredIndexCleaners.add(indexCleaner);
412 String message = "Failed to register index cleaner because index name is null";
413 LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
418 * State machine should drive our flow dosync just dispatches an action and the state machine
419 * determines what is in play and what is next
425 * @param showFinalReport the show final report
427 private void dumpStatReport(boolean showFinalReport) {
429 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
431 String statReport = synchronizer.getStatReport(showFinalReport);
433 if (statReport != null) {
434 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
442 private void clearCaches() {
445 * Any entity caches that were built as part of the sync operation should be cleared to save
446 * memory. The original intent of the caching was to provide a short-lived cache to satisfy
447 * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
450 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
451 synchronizer.clearCache();
456 * Perform pre sync cleanup collection.
458 private void performPreSyncCleanupCollection() {
461 * ask the index cleaners to collect the their pre-sync object id collections
464 for (IndexCleaner cleaner : registeredIndexCleaners) {
465 cleaner.populatePreOperationCollection();
468 changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
473 * Perform index sync post collection.
475 private void performIndexSyncPostCollection() {
478 * ask the entity purgers to collect the their pre-sync object id collections
481 for (IndexCleaner cleaner : registeredIndexCleaners) {
482 cleaner.populatePostOperationCollection();
488 * Perform index cleanup.
490 private void performIndexCleanup() {
493 * ask the entity purgers to collect the their pre-sync object id collections
496 for (IndexCleaner cleaner : registeredIndexCleaners) {
497 cleaner.performCleanup();
503 * Perform sync abort.
505 private void performSyncAbort() {
506 changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
510 * Perform index integrity validation.
512 private void performIndexIntegrityValidation() {
515 * loop through registered index validators and test and fix, if needed
518 for (IndexValidator validator : registeredIndexValidators) {
520 if (!validator.exists()) {
521 validator.createOrRepair();
523 } catch (Exception exc) {
524 String message = "Index validator caused an error = " + exc.getMessage();
525 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
529 changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
534 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
537 public void shutdown() {
539 this.syncControllerExecutor.shutdown();
540 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
543 synchronizer.shutdown();
544 } catch (Exception exc) {
545 LOG.error(AaiUiMsgs.ERROR_GENERIC,
546 "Synchronizer shutdown caused an error = " + exc.getMessage());
550 this.statReporterExecutor.shutdown();
554 * Need some kind of task running that responds to a transient boolean to kill it or we just stop
555 * the executor that it is in?
561 * Perform synchronization.
563 private void performSynchronization() {
566 * Get all the synchronizers running in parallel
569 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
570 supplyAsync(new Supplier<Void>() {
575 synchronizer.doSync();
579 }, this.syncControllerExecutor).whenComplete((result, error) -> {
582 * We don't bother checking the result, because it will always be null as the doSync() is
587 LOG.error(AaiUiMsgs.ERROR_GENERIC,
588 "doSync operation failed with an error = " + error.getMessage());
593 boolean allDone = false;
594 long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
595 boolean dumpPeriodicStatReport = false;
598 int totalFinished = 0;
600 for (IndexSynchronizer synchronizer : registeredSynchronizers) {
601 if (dumpPeriodicStatReport) {
602 if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
603 String statReport = synchronizer.getStatReport(false);
605 if (statReport != null) {
606 LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
611 if (synchronizer.getState() == SynchronizerState.IDLE
612 || synchronizer.getState() == SynchronizerState.ABORTED) {
617 if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
618 dumpPeriodicStatReport = true;
619 nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
621 dumpPeriodicStatReport = false;
624 allDone = (totalFinished == registeredSynchronizers.size());
628 } catch (InterruptedException exc) {
629 LOG.error(AaiUiMsgs.ERROR_GENERIC,
630 "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
631 Thread.currentThread().interrupt();
636 changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
641 * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
644 public SynchronizerState getState() {
646 switch (currentInternalState) {
649 return SynchronizerState.IDLE;
653 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
661 public Calendar getCreationTime() {
666 public String getNextSyncTime() {
667 // TODO Auto-generated method stub
672 public boolean isPeriodicSyncEnabled() {
673 return syncControllerConfig.isPeriodicSyncEnabled();
677 public boolean isRunOnceSyncEnabled() {
678 return syncControllerConfig.isRunOnceSyncEnabled();