*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
-package org.onap.aai.sparky.synchronizer;
+package org.onap.aai.sparky.sync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
+import java.util.Calendar;
import java.util.Collection;
+import java.util.Date;
import java.util.LinkedHashSet;
+import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
-import org.onap.aai.sparky.util.NodeUtils;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.sparky.logging.AaiUiMsgs;
+import org.onap.aai.sparky.sync.config.SyncControllerConfig;
+import org.onap.aai.sparky.sync.enumeration.OperationState;
+import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
+import org.onap.aai.sparky.util.NodeUtils;
/**
* The Class SyncController.
*
* @author davea.
*/
-public class SyncController {
- private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncController.class);
+public class SyncControllerImpl implements SyncController {
+ private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
/**
* The Enum InternalState.
private InternalState currentInternalState;
private ExecutorService syncControllerExecutor;
private ExecutorService statReporterExecutor;
- private final String controllerName;
+
+ private long delayInMs;
+ private long syncFrequencyInMs;
+ private Date syncStartTime;
+
+ private Date lastExecutionDate;
+ private AtomicInteger runCount;
+ private Semaphore performingActionGate;
+ private Calendar creationTime;
+
+ private String syncStartTimeWithTimeZone;
+ private String controllerName;
+
+ protected SyncControllerConfig syncControllerConfig;
+
+
/**
* Instantiates a new sync controller.
* @param name the name
* @throws Exception the exception
*/
- public SyncController(String name) throws Exception {
+ public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
+ this(syncControllerConfig, null);
+ }
- this.controllerName = name;
- /*
- * Does LHS result in a non-duplicated object collection?? What happens if you double-add an
- * object?
- */
+ public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
+ throws Exception {
+
+ this.syncControllerConfig = syncControllerConfig;
+ this.delayInMs = 0L;
+ this.syncFrequencyInMs = 86400000L;
+ this.syncStartTime = null;
+ this.lastExecutionDate = null;
+ this.runCount = new AtomicInteger(0);
+ this.performingActionGate = new Semaphore(1);
registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
registeredIndexValidators = new LinkedHashSet<IndexValidator>();
registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
- this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController", 5, LOG);
- this.statReporterExecutor = NodeUtils.createNamedExecutor("StatReporter", 1, LOG);
+ String controllerName = syncControllerConfig.getControllerName();
+
+ if (targetEntityType != null) {
+ controllerName += " (" + targetEntityType + ")";
+ }
+
+ this.controllerName = controllerName;
+
+ this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
+ syncControllerConfig.getNumSyncControllerWorkers(), LOG);
+ this.statReporterExecutor =
+ NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
this.currentInternalState = InternalState.IDLE;
+
+ this.creationTime = Calendar
+ .getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
+
}
+
+
/**
* Change internal state.
*
performStateAction();
}
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
+ */
+ @Override
+ public long getDelayInMs() {
+ return delayInMs;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
+ */
+ @Override
+ public void setDelayInMs(long delayInMs) {
+ this.delayInMs = delayInMs;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
+ */
+ @Override
+ public long getSyncFrequencyInMs() {
+ return syncFrequencyInMs;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
+ */
+ @Override
+ public void setSyncFrequencyInMs(long syncFrequencyInMs) {
+ this.syncFrequencyInMs = syncFrequencyInMs;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
+ */
+ @Override
+ public Date getSyncStartTime() {
+ return syncStartTime;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
+ */
+ @Override
+ public void setSyncStartTime(Date syncStartTime) {
+ this.syncStartTime = syncStartTime;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
+ */
+ @Override
+ public Date getLastExecutionDate() {
+ return lastExecutionDate;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
+ */
+ @Override
+ public void setLastExecutionDate(Date lastExecutionDate) {
+ this.lastExecutionDate = lastExecutionDate;
+ }
+
+ @Override
public String getControllerName() {
return controllerName;
}
- /**
- * Perform action.
- *
- * @param requestedAction the requested action
- */
- public void performAction(SyncActions requestedAction) {
+
+
+ @Override
+ public OperationState performAction(SyncActions requestedAction) {
if (currentInternalState == InternalState.IDLE) {
try {
+
+ /*
+ * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
+ * at a time.
+ */
+
switch (requestedAction) {
case SYNCHRONIZE:
- changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
+
+ if (performingActionGate.tryAcquire()) {
+ try {
+
+ long opStartTime = System.currentTimeMillis();
+
+ LOG.info(AaiUiMsgs.INFO_GENERIC,
+ getControllerName() + " started synchronization at "
+ + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
+ SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
+
+ runCount.incrementAndGet();
+
+ changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
+
+ long opEndTime = System.currentTimeMillis();
+
+ long opTime = (opEndTime - opStartTime);
+
+ String durationMessage =
+ String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
+
+ LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
+
+ if (syncControllerConfig.isPeriodicSyncEnabled()) {
+
+ LOG.info(AaiUiMsgs.INFO_GENERIC,
+ getControllerName() + " next sync to begin at " + getNextSyncTime());
+
+ TimeZone tz =
+ TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
+
+ if (opTime > this.getSyncFrequencyInMs()) {
+
+ String durationWasLongerMessage = String.format(
+ getControllerName() + " synchronization took '%d' ms which is larger than"
+ + " synchronization interval of '%d' ms.",
+ opTime, this.getSyncFrequencyInMs());
+
+ LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
+ }
+ }
+
+ } catch (Exception syncException) {
+ String message = "An error occurred while performing action = " + requestedAction
+ + ". Error = " + syncException.getMessage();
+ LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
+ } finally {
+ performingActionGate.release();
+ }
+ } else {
+ return OperationState.IGNORED_SYNC_NOT_IDLE;
+ }
+
break;
default:
break;
}
+ return OperationState.OK;
+
} catch (Exception exc) {
String message = "An error occurred while performing action = " + requestedAction
+ ". Error = " + exc.getMessage();
LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
+ return OperationState.ERROR;
+ } finally {
+
}
} else {
LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
+ return OperationState.IGNORED_SYNC_NOT_IDLE;
}
}
break;
}
} catch (Exception exc) {
+ /*
+ * Perhaps we should abort the sync on an exception
+ */
String message = "Caught an error which performing action. Error = " + exc.getMessage();
LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
}
}
- /**
- * Register entity synchronizer.
- *
- * @param entitySynchronizer the entity synchronizer
- */
+ @Override
public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
String indexName = entitySynchronizer.getIndexName();
}
- /**
- * Register index validator.
- *
- * @param indexValidator the index validator
- */
+ @Override
public void registerIndexValidator(IndexValidator indexValidator) {
String indexName = indexValidator.getIndexName();
}
- /**
- * Register index cleaner.
- *
- * @param indexCleaner the index cleaner
- */
+ @Override
public void registerIndexCleaner(IndexCleaner indexCleaner) {
String indexName = indexCleaner.getIndexName();
}
- /**
- * Shutdown.
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
*/
+ @Override
public void shutdown() {
this.syncControllerExecutor.shutdown();
boolean dumpPeriodicStatReport = false;
while (!allDone) {
-
int totalFinished = 0;
for (IndexSynchronizer synchronizer : registeredSynchronizers) {
if (dumpPeriodicStatReport) {
- if (synchronizer.getState() != SynchronizerState.IDLE) {
+ if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
String statReport = synchronizer.getStatReport(false);
+
if (statReport != null) {
LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
}
}
- if (synchronizer.getState() == SynchronizerState.IDLE) {
- totalFinished++;
- }
+ }
+
+ if (synchronizer.getState() == SynchronizerState.IDLE
+ || synchronizer.getState() == SynchronizerState.ABORTED) {
+ totalFinished++;
}
}
+
if (System.currentTimeMillis() > nextReportTimeStampInMs) {
dumpPeriodicStatReport = true;
nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
} else {
dumpPeriodicStatReport = false;
}
+
allDone = (totalFinished == registeredSynchronizers.size());
try {
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
+ */
+ @Override
public SynchronizerState getState() {
switch (currentInternalState) {
}
+ @Override
+ public Calendar getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public String getNextSyncTime() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isPeriodicSyncEnabled() {
+ return syncControllerConfig.isPeriodicSyncEnabled();
+ }
+
+ @Override
+ public boolean isRunOnceSyncEnabled() {
+ return syncControllerConfig.isRunOnceSyncEnabled();
+ }
+
}