From: Joey Sullivan Date: Wed, 27 Sep 2017 23:02:31 +0000 (+0000) Subject: serializing OAM async task X-Git-Tag: v1.2.0~48 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=e5aad808db5203acec496eed43b5785fc0640d90;p=appc.git serializing OAM async task Change-Id: I0c98636c165a2cc5b9915a3950ab64744e6328c7 Issue-Id: APPC-244 Signed-off-by: Joey Sullivan --- diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseActionRunnable.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseActionRunnable.java index 542e53d63..b118eb0d1 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseActionRunnable.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseActionRunnable.java @@ -45,9 +45,6 @@ import java.util.concurrent.Future; *
- finalState */ public abstract class BaseActionRunnable extends BaseCommon implements Runnable { - final String OAM_OPERATION_TIMEOUT_SECOND = "appc.OAM.api.timeout"; - /** Default operation tiemout set to 1 minute */ - final int DEFAULT_OAM_OPERATION_TIMEOUT = 60; /** Abort due to rejection message format with flexible operation name */ final String ABORT_MESSAGE_FORMAT = "Aborting %s operation due to %s."; /** Timeout message format with flexible operation name */ @@ -58,7 +55,6 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable final String DUE_TO_EXECUTION_ERROR = "due to execution error."; private boolean isWaiting = false; - private AppcOamStates currentState; long startTimeMs = 0; long timeoutMs = 0; boolean doTimeoutChecking = false; @@ -80,32 +76,25 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable rpc = parent.rpc; commonHeader = parent.commonHeader; - startTime = parent.startTime; myParent = parent; - setTimeoutValues(); } /** - * Set timeout in milliseconds + * Collect the timeout value for this {@link BaseActionRunnable} */ void setTimeoutValues() { - Integer timeoutSeconds = myParent.timeoutSeconds; - if (timeoutSeconds == null) { - timeoutMs = configurationHelper.getConfig().getIntegerProperty( - OAM_OPERATION_TIMEOUT_SECOND, DEFAULT_OAM_OPERATION_TIMEOUT) * 1000; - } else { - timeoutMs = timeoutSeconds.longValue() * 1000; - } - + startTime = myParent.startTime; + timeoutMs = myParent.getTimeoutMilliseconds(); doTimeoutChecking = timeoutMs != 0; if (doTimeoutChecking) { startTimeMs = startTime.getTime(); } logDebug("%s action runnable check timeout (%s) with timeout (%d)ms, and startMs (%d)", - rpc.name(), Boolean.toString(doTimeoutChecking), timeoutMs, startTimeMs); + rpc.name(), Boolean.toString(doTimeoutChecking), timeoutMs, startTimeMs); } + /** * Abort operation handling due to outside interruption, does
* - set ABORT status
@@ -114,7 +103,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable * * @param newRpc of the new AppcOam.RPC operation. */ - public void abortRunnable(final AppcOam.RPC newRpc) { + void abortRunnable(final AppcOam.RPC newRpc) { resetLogProperties(false); String additionalMsg = String.format(NEW_RPC_OPERATION_REQUEST, newRpc); @@ -131,7 +120,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable try { setInitialLogProperties(); logDebug(String.format("===========in %s run (waiting: %s)=======", - actionName, Boolean.toString(isWaiting))); + actionName, Boolean.toString(isWaiting))); if (isWaiting) { if (!checkState()) { @@ -159,7 +148,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable */ void keepWaiting() { logDebug(String.format("%s runnable waiting, current state is %s.", - actionName, currentState == null ? "null" : currentState.toString())); + actionName, stateHelper.getCurrentOamState())); isTimeout("keepWaiting"); } @@ -173,9 +162,9 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable boolean isTimeout(String parentName) { logDebug(String.format("%s task isTimeout called from %s", actionName, parentName)); if (doTimeoutChecking - && System.currentTimeMillis() - startTimeMs > timeoutMs) { + && System.currentTimeMillis() - startTimeMs > timeoutMs) { logger.error(String.format("%s operation timeout (%d) ms has reached, abort with error state.", - actionName, timeoutMs)); + actionName, timeoutMs)); setStatus(OAMCommandStatus.TIMEOUT, String.format(TIMEOUT_MESSAGE_FORMAT, rpc.name(), timeoutMs)); postAction(AppcOamStates.Error); @@ -253,8 +242,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable return true; } - currentState = stateHelper.getBundlesState(); - if (currentState == finalState) { + if (stateHelper.getBundlesState() == finalState) { setStatus(OAMCommandStatus.SUCCESS); postDoAction(true); return true; diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseCommon.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseCommon.java index ccb57305a..9c28007b4 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseCommon.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseCommon.java @@ -53,7 +53,6 @@ import static com.att.eelf.configuration.Configuration.MDC_INSTANCE_UUID; import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN; import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS; -import static com.att.eelf.configuration.Configuration.MDC_SERVICE_INSTANCE_ID; import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME; /** @@ -61,7 +60,7 @@ import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME; * - BaseProcessor (for REST sync handling)
* - BaseActionRunnable (for REST async handling) */ -abstract class BaseCommon { +public abstract class BaseCommon { final EELFLogger logger; final ConfigurationHelper configurationHelper; final StateHelper stateHelper; @@ -74,15 +73,14 @@ abstract class BaseCommon { CommonHeader commonHeader; private final List MDC_KEYS = Arrays.asList( - LoggingConstants.MDCKeys.PARTNER_NAME, - LoggingConstants.MDCKeys.SERVER_NAME, - MDC_INSTANCE_UUID, - MDC_KEY_REQUEST_ID, - MDC_SERVER_FQDN, - MDC_SERVER_IP_ADDRESS, - MDC_SERVICE_NAME + LoggingConstants.MDCKeys.PARTNER_NAME, + LoggingConstants.MDCKeys.SERVER_NAME, + MDC_INSTANCE_UUID, + MDC_KEY_REQUEST_ID, + MDC_SERVER_FQDN, + MDC_SERVER_IP_ADDRESS, + MDC_SERVICE_NAME ); - private Map oldMdcContent = new HashMap<>(); /** @@ -110,19 +108,19 @@ abstract class BaseCommon { void auditInfoLog(Msg msg) { LoggingUtils.auditInfo(startTime.toInstant(), new Date(System.currentTimeMillis()).toInstant(), - String.valueOf(status.getCode()), - status.getMessage(), - getClass().getCanonicalName(), - msg, - configurationHelper.getAppcName(), - stateHelper.getCurrentOamState().toString() + String.valueOf(status.getCode()), + status.getMessage(), + getClass().getCanonicalName(), + msg, + configurationHelper.getAppcName(), + stateHelper.getCurrentOamState().toString() ); } /** * Set MDC properties. */ - void setInitialLogProperties() { + public final void setInitialLogProperties() { MDC.put(MDC_KEY_REQUEST_ID, commonHeader.getRequestId()); MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, commonHeader.getOriginatorId()); MDC.put(MDC_INSTANCE_UUID, ""); // value should be created in the future @@ -141,15 +139,14 @@ abstract class BaseCommon { /** * Clear MDC properties. */ - void clearRequestLogProperties() { - try { - MDC.remove(MDC_KEY_REQUEST_ID); - MDC.remove(MDC_SERVICE_INSTANCE_ID); - MDC.remove(MDC_SERVICE_NAME); - MDC.remove(LoggingConstants.MDCKeys.PARTNER_NAME); - MDC.remove(LoggingConstants.MDCKeys.TARGET_VIRTUAL_ENTITY); - } catch (Exception e) { - logger.error("Unable to clear the Request Log properties" + e.getMessage()); + public final void clearRequestLogProperties() { + for (String key : MDC_KEYS) { + try { + MDC.remove(key); + } catch (Exception e) { + logger.error( + String.format("Unable to clear the Log properties (%s) due to exception: %s", key, e.getMessage())); + } } } @@ -227,24 +224,24 @@ abstract class BaseCommon { errorMessage = EELFResourceManager.format(Msg.OAM_OPERATION_INVALID_INPUT, t.getMessage()); } else if (t instanceof InvalidStateException) { exceptionMessage = String.format(AppcOam.INVALID_STATE_MESSAGE_FORMAT, - rpc.getAppcOperation(), appName, stateHelper.getCurrentOamState()); + rpc.getAppcOperation(), appName, stateHelper.getCurrentOamState()); oamCommandStatus = OAMCommandStatus.REJECTED; errorMessage = EELFResourceManager.format(Msg.INVALID_STATE_TRANSITION, exceptionMessage); } else { oamCommandStatus = OAMCommandStatus.UNEXPECTED_ERROR; errorMessage = EELFResourceManager.format(Msg.OAM_OPERATION_EXCEPTION, t, - appName, t.getClass().getSimpleName(), rpc.name(), exceptionMessage); + appName, t.getClass().getSimpleName(), rpc.name(), exceptionMessage); } setStatus(oamCommandStatus, exceptionMessage); LoggingUtils.logErrorMessage( - String.valueOf(status.getCode()), - status.getMessage(), - LoggingConstants.TargetNames.APPC, - LoggingConstants.TargetNames.APPC_OAM_PROVIDER, - errorMessage, - AppcOam.class.getCanonicalName()); + String.valueOf(status.getCode()), + status.getMessage(), + LoggingConstants.TargetNames.APPC, + LoggingConstants.TargetNames.APPC_OAM_PROVIDER, + errorMessage, + AppcOam.class.getCanonicalName()); resetLogProperties(true); } diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseProcessor.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseProcessor.java index 784beccc3..aa5423d3b 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseProcessor.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseProcessor.java @@ -40,6 +40,8 @@ import org.openecomp.appc.statemachine.impl.readers.AppcOamStates; import java.util.Date; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Base processor for OAM APIs, such as maintenance mode, restart, start and stop API. @@ -48,11 +50,14 @@ import java.util.concurrent.Future; *

Specific API processor will overwrite the general methods to add specific behaviors. */ public abstract class BaseProcessor extends BaseCommon { + /** lock to serialize incoming OAM operations. */ + private static final Object LOCK = new Object(); + final AsyncTaskHelper asyncTaskHelper; final BundleHelper bundleHelper; - - Integer timeoutSeconds; + /** the requestTimeoutSeconds to use for this OAM operation */ + private Integer requestTimeoutSeconds; Msg auditMsg; BaseActionRunnable runnable; private Future scheduledRunnable = null; @@ -90,7 +95,8 @@ public abstract class BaseProcessor extends BaseCommon { try { preProcess(requestInput); - timeoutSeconds = operationHelper.getParamRequestTimeout(requestInput); + //The OAM request may specify timeout value + requestTimeoutSeconds = operationHelper.getParamRequestTimeout(requestInput); scheduleAsyncTask(); } catch (Exception e) { setErrorStatus(e); @@ -112,13 +118,31 @@ public abstract class BaseProcessor extends BaseCommon { * @throws APPCException when state validation failed */ protected void preProcess(final Object requestInput) - throws InvalidInputException, APPCException, InvalidStateException { + throws InvalidInputException, APPCException, InvalidStateException,InterruptedException,TimeoutException { + setInitialLogProperties(); operationHelper.isInputValid(requestInput); - AppcOamStates nextState = operationHelper.getNextState( - rpc.getAppcOperation(), stateHelper.getCurrentOamState()); - setInitialLogProperties(); - stateHelper.setState(nextState); + //All OAM operation pass through here first to validate if an OAM state change is allowed. + //If a state change is allowed cancel the occurring OAM (if any) before starting this one. + //we will synchronized so that only one can do this at any given time. + synchronized(LOCK) { + AppcOamStates currentOamState = stateHelper.getCurrentOamState(); + + //make sure this OAM operation can transition to the desired OAM operation + AppcOamStates nextState = operationHelper.getNextState( + rpc.getAppcOperation(), currentOamState); + + stateHelper.setState(nextState); + + //cancel the BaseActionRunnable currently executing + //it got to be completely terminated before proceeding + asyncTaskHelper.cancelBaseActionRunnable( + rpc, + currentOamState, + getTimeoutMilliseconds(), + TimeUnit.MILLISECONDS + ); + } } /** @@ -135,32 +159,49 @@ public abstract class BaseProcessor extends BaseCommon { protected void scheduleAsyncTask() { if (runnable == null) { logger.error(String.format( - "Skipped schedule async task for rpc(%s) due to runnable is null", rpc.name())); + "Skipped schedule async task for rpc(%s) due to runnable is null", rpc.name())); return; } - scheduledRunnable = asyncTaskHelper.scheduleAsyncTask(rpc, runnable); + scheduledRunnable = asyncTaskHelper.scheduleBaseRunnable( + runnable, runnable::abortRunnable, getInitialDelayMillis(), getDelayMillis()); } + /** - * Check if current running task is the same as schedule task - * @return true if they are the same, otherwise false. + * The timeout for this OAM operation. The timeout source is chosen in the following order: + * request, config file, default value + * @return - the timeout for this OAM operation. */ - boolean isSameAsyncTask() { - return asyncTaskHelper.getCurrentAsyncTask() == scheduledRunnable; + long getTimeoutMilliseconds() { + return configurationHelper.getOAMOperationTimeoutValue(this.requestTimeoutSeconds); } + /** - * Cancel schedueled async task through AsyncTaskHelper + * @return initialDelayMillis - the time to delay first execution of {@link BaseActionRunnable} + */ + protected long getInitialDelayMillis(){ + return 0L; + } + + /** + * @return delayMillis the delay between the consecutive executions of {@link BaseActionRunnable} + */ + private long getDelayMillis(){ + return 1000L; + } + + /** + * Cancel the scheduled {@link BaseActionRunnable} through AsyncTaskHelper */ void cancelAsyncTask() { if (scheduledRunnable == null) { logger.error(String.format( - "Skipped cancel schedule async task for rpc(%s) due to scheduledRunnable is null", rpc.name())); + "Skipped cancel schedule async task for rpc(%s) due to scheduledRunnable is null", rpc.name())); return; } - - asyncTaskHelper.cancelAsyncTask(scheduledRunnable); - scheduledRunnable = null; + scheduledRunnable.cancel(true); } + } diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamMmodeProcessor.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamMmodeProcessor.java index 973d0af36..6a0466022 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamMmodeProcessor.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamMmodeProcessor.java @@ -39,6 +39,8 @@ import org.openecomp.appc.requesthandler.LCMStateManager; import org.openecomp.appc.requesthandler.RequestHandler; import org.openecomp.appc.statemachine.impl.readers.AppcOamStates; +import java.util.concurrent.TimeoutException; + /** * Processor to handle maintenance mode OAM API. */ @@ -65,7 +67,7 @@ public class OamMmodeProcessor extends BaseProcessor { @Override protected void preProcess(final Object requestInput) - throws InvalidInputException, InvalidStateException, APPCException { + throws InvalidInputException, InvalidStateException, APPCException, InterruptedException, TimeoutException { super.preProcess(requestInput); //Close the gate so that no more new LCM request will be excepted. @@ -79,15 +81,26 @@ public class OamMmodeProcessor extends BaseProcessor { super.scheduleAsyncTask(); } + /** + * {@inheritDoc} + * For maintenance mode we want a longer delay before initial execution of {@link BaseActionRunnable} + * so that any accepted LCM actions have time to git scheduled in the Dispatcher. + */ + @Override + protected long getInitialDelayMillis(){ + //wait ten seconds before + return 10000L; + } + /** * This runnable does the async handling for the maintenance mode REST API, and will be scheduled to run * until terminating condition reaches. * - *

The runnable will conintue run if:
+ *

The runnable will continue run if:
* - the runnable is not canceled outside
* - the in progress LCM request count is not zero
*

When LCM request count reaches to zero, this runnable will:
- * - post message through operatonHelper
+ * - post message through operationHelper
* - set APP-C OAM state to maintenance mode
* - audit log the state
* - terminate this runnable itself
@@ -98,7 +111,7 @@ public class OamMmodeProcessor extends BaseProcessor { MyRunnable(BaseProcessor parent) { super(parent); - actionName = "OAM Maintanence mode"; + actionName = "OAM Maintenance mode"; auditMsg = Msg.OAM_OPERATION_MAINTENANCE_MODE; finalState = AppcOamStates.MaintenanceMode; } @@ -113,12 +126,6 @@ public class OamMmodeProcessor extends BaseProcessor { boolean checkState() { logDebug(String.format("Executing %s task", actionName)); - if (!myParent.isSameAsyncTask()) { - // cancel myself if I am not the current backgroundOamTask - myParent.cancelAsyncTask(); - logDebug(String.format("Finished %s task due to task removed", actionName)); - return true; - } boolean hasError = false; try { @@ -156,8 +163,8 @@ public class OamMmodeProcessor extends BaseProcessor { @Override void keepWaiting() { logDebug("The application '%s' has '%s' outstanding LCM request to complete" + - " before coming to a complete maintenance_mode.", - configurationHelper.getAppcName(), inprogressRequestCount); + " before coming to a complete maintenance_mode.", + configurationHelper.getAppcName(), inprogressRequestCount); } } } diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamRestartProcessor.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamRestartProcessor.java index e9f0ada56..857578917 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamRestartProcessor.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamRestartProcessor.java @@ -112,14 +112,14 @@ public class OamRestartProcessor extends BaseProcessor { @Override boolean doAction() { logDebug(String.format("Executing %s task at phase (%s)", - actionName, currentPhase == null ? "null" : currentPhase.name())); + actionName, currentPhase == null ? "null" : currentPhase.name())); boolean isBundleOperationCompleted = true; try { switch (currentPhase) { case ToStop: isBundleOperationCompleted = bundleHelper.bundleOperations( - AppcOam.RPC.stop, bundleNameToFuture, myParent.asyncTaskHelper); + AppcOam.RPC.stop, bundleNameToFuture, myParent.asyncTaskHelper, this); currentPhase = ActionPhases.Stopped; break; case Stopped: @@ -129,12 +129,12 @@ public class OamRestartProcessor extends BaseProcessor { currentPhase = ActionPhases.ToStart; } else { logDebug(String.format("%s task is waiting in stopped phase, current state is %s", - actionName, currentState)); + actionName, currentState)); } break; case ToStart: isBundleOperationCompleted = bundleHelper.bundleOperations( - AppcOam.RPC.start, bundleNameToFuture, myParent.asyncTaskHelper); + AppcOam.RPC.start, bundleNameToFuture, myParent.asyncTaskHelper, this); currentPhase = ActionPhases.Started; break; case Error: @@ -143,13 +143,13 @@ public class OamRestartProcessor extends BaseProcessor { default: // Should not reach log it and return false; logger.error("%s task doAction reached %s phase. not supported. return false.", - actionName, currentPhase.name()); + actionName, currentPhase.name()); stateHelper.setState(AppcOamStates.Error); return false; } if (isTimeout("restart doAction") - || hasBundleOperationFailure()) { + || hasBundleOperationFailure()) { currentPhase = ActionPhases.Error; return true; } diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStartProcessor.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStartProcessor.java index 0060bfcae..0d2f50513 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStartProcessor.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStartProcessor.java @@ -101,7 +101,7 @@ public class OamStartProcessor extends BaseProcessor { if (stateHelper.getState() != AppcOamStates.Started) { logDebug("Start - APPC OAM state is not started, start the bundles"); isBundleOperationCompleted = bundleHelper.bundleOperations( - rpc, bundleNameToFuture, myParent.asyncTaskHelper); + rpc, bundleNameToFuture, myParent.asyncTaskHelper, this); } if (isBundleOperationCompleted) { diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStopProcessor.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStopProcessor.java index d81901638..d8d88d319 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStopProcessor.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStopProcessor.java @@ -94,7 +94,7 @@ public class OamStopProcessor extends BaseProcessor { try { boolean isBundleOperationCompleted = bundleHelper.bundleOperations( - rpc, bundleNameToFuture, myParent.asyncTaskHelper); + rpc, bundleNameToFuture, myParent.asyncTaskHelper, this); if (isBundleOperationCompleted) { return true; } diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/AsyncTaskHelper.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/AsyncTaskHelper.java index db6033752..0a4b868a8 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/AsyncTaskHelper.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/AsyncTaskHelper.java @@ -27,34 +27,61 @@ package org.openecomp.appc.oam.util; import com.att.eelf.configuration.EELFLogger; import org.openecomp.appc.oam.AppcOam; import org.openecomp.appc.oam.processor.BaseActionRunnable; +import org.openecomp.appc.statemachine.impl.readers.AppcOamStates; import org.osgi.framework.Bundle; import org.osgi.framework.FrameworkUtil; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; /** - * Utility class provides general async task related help. + * The AsyncTaskHelper class manages an internal parent child data structure. The parent is a transient singleton, + * meaning only one can exist at any given time. The parent is scheduled with the + * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and is executed at configured interval. It can be + * terminated by using the {@link Future#cancel(boolean)} or the {@link Future#cancel(boolean)} returned from \ + * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}. + *

+ * The children are scheduled using {@link #submitBaseSubCallable(Callable)}} and can only be scheduled if a parent + * is scheduled. Children only execute once, but can be terminated preemptively by the {@link Future#cancel(boolean)} + * returned from {@link #submitBaseSubCallable(Callable)} or indirectly by terminating the parent via the method + * described above. + *

+ * This class augments the meaning of {@link Future#isDone()} in that it guarantees that this method only returns true + * if the scheduled {@link Runnable} or {@link Callable} is not currently executing and is not going to execute in the + * future. This is different than the Java core implementation of {@link Future#isDone()} in which it will return + * true immediately after the {@link Future#cancel(boolean)} is called. Even if a Thread is actively executing the + * {@link Runnable} or {@link Callable} and has not return yet. See Java BUG JDK-8073704 + *

+ * The parent {@link Future#isDone()} has an additional augmentation in that it will not return true until all of its + * children's {@link Future#isDone()} also return true. + * */ @SuppressWarnings("unchecked") public class AsyncTaskHelper { - final int MMODE_TASK_DELAY = 10000; - final int COMMON_INITIAL_DELAY = 0; - final int COMMON_INTERVAL = 1000; private final EELFLogger logger; private final ScheduledExecutorService scheduledExecutorService; private final ThreadPoolExecutor bundleOperationService; - /** Reference to the Async task */ - private volatile Future backgroundOamTask; - /** Reference to the runnable of Async task */ - private volatile BaseActionRunnable taskRunnable; + /** Reference to {@link MyFuture} return from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */ + private MyFuture backgroundBaseRunnableFuture; + + /** The cancel Callback from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */ + private Consumer cancelCallBackForBaseRunnable; + + /** All Futures created by thus calls which have not completed -- {@link Future#isDone()} equals false */ + private Set myFutureSet = new HashSet<>(); /** * Constructor @@ -64,103 +91,162 @@ public class AsyncTaskHelper { logger = eelfLogger; scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - (runnable) -> { - Bundle bundle = FrameworkUtil.getBundle(AppcOam.class); - return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor"); - } + (runnable) -> { + Bundle bundle = FrameworkUtil.getBundle(AppcOam.class); + return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor"); + } ); bundleOperationService = new ThreadPoolExecutor( - 0, - 10, - 10, - TimeUnit.SECONDS, - new LinkedBlockingQueue(),// BlockingQueue workQueue - (runnable) -> new Thread(runnable, "OAM bundler operation executor")//ThreadFactory + 0, + 10, + 10, + TimeUnit.SECONDS, + new LinkedBlockingQueue(), //BlockingQueue workQueue + (runnable) -> { + Bundle bundle = FrameworkUtil.getBundle(AppcOam.class); + return new Thread(runnable, bundle.getSymbolicName() + " bundle operation executor"); + } ); } - void addThreadsToPool() { - bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize()); - } - - void removeThreadsFromPoolWhenDone() { - bundleOperationService.setCorePoolSize(0); - } - /** * Terminate the class ScheduledExecutorService */ public void close() { logDebug("Start shutdown scheduleExcutorService."); - scheduledExecutorService.shutdown(); - bundleOperationService.shutdown(); + bundleOperationService.shutdownNow(); + scheduledExecutorService.shutdownNow(); logDebug("Completed shutdown scheduleExcutorService."); } + /** - * Get current async task refernce - * @return the class backgroundOamTask + * Cancel currently executing {@link BaseActionRunnable} if any. + * This method returns immediately if there is currently no {@link BaseActionRunnable} actively executing. + * @param rpcCausingAbort - The RPC causing the abort + * @param stateBeingAbborted - The current state being canceled + * @param timeout - The amount of time to wait for a cancel to complete + * @param timeUnit - The unit of time of timeout + * @throws TimeoutException - If {@link BaseActionRunnable} has not completely cancelled within the timeout period + * @throws InterruptedException - If the Thread waiting for the abort */ - public Future getCurrentAsyncTask() { - return backgroundOamTask; + public synchronized void cancelBaseActionRunnable(final AppcOam.RPC rpcCausingAbort, + AppcOamStates stateBeingAbborted, + long timeout, TimeUnit timeUnit) + throws TimeoutException,InterruptedException { + + final MyFuture localBackgroundBaseRunnableFuture = backgroundBaseRunnableFuture; + final Consumer localCancelCallBackForBaseRunnable = cancelCallBackForBaseRunnable; + + if (localBackgroundBaseRunnableFuture == null || localBackgroundBaseRunnableFuture.isDone()) { + return; + } + + if (localCancelCallBackForBaseRunnable != null) { + localCancelCallBackForBaseRunnable.accept(rpcCausingAbort); + } + localBackgroundBaseRunnableFuture.cancel(true); + + long timeoutMillis = timeUnit.toMillis(timeout); + long expiryTime = System.currentTimeMillis() + timeoutMillis; + while (!(localBackgroundBaseRunnableFuture.isDone())) { + long sleepTime = expiryTime - System.currentTimeMillis(); + if (sleepTime < 1) { + break; + } + this.wait(sleepTime); + } + + if (!localBackgroundBaseRunnableFuture.isDone()) { + throw new TimeoutException(String.format("Unable to abort %s in timely manner.",stateBeingAbborted)); + } } /** - * Schedule a service for async task with the passed in parameters - * @param rpc of the REST API call, decides how to schedule the service + * Schedule a {@link BaseActionRunnable} to begin async execution. This is the Parent {@link Runnable} for the + * children that are submitted by {@link #submitBaseSubCallable(Callable)} + * + * The currently executing {@link BaseActionRunnable} must fully be terminated before the next can be scheduled. + * This means all Tasks' {@link MyFuture#isDone()} must equal true and all threads must return to their respective + * thread pools. + * * @param runnable of the to be scheduled service. - * @return the reference of the scheduled task + * @param cancelCallBack to be invoked when + * {@link #cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} is invoked. + * @param initialDelayMillis the time to delay first execution + * @param delayMillis the delay between the termination of one + * execution and the commencement of the next + * @return The {@link BaseActionRunnable}'s {@link Future} + * @throws IllegalStateException if there is currently executing Task */ - public Future scheduleAsyncTask(final AppcOam.RPC rpc, final BaseActionRunnable runnable) { - int initialDelay, interval; - switch (rpc) { - case maintenance_mode: - initialDelay = interval =MMODE_TASK_DELAY; - break; - case start: - case stop: - case restart: - initialDelay = COMMON_INITIAL_DELAY; - interval = COMMON_INTERVAL; - break; - default: - // should not get here. Log it and return null - logDebug(String.format("Cannot scheudle task for unsupported RPC(%s).", rpc.name())); - return null; - } - - // Always cancel existing async task - if (backgroundOamTask != null) { - logDebug("Cancelling background task in schedule task."); - backgroundOamTask.cancel(true); - if (taskRunnable != null) { - taskRunnable.abortRunnable(rpc); - } + public synchronized Future scheduleBaseRunnable(final Runnable runnable, + final Consumer cancelCallBack, + long initialDelayMillis, + long delayMillis) + throws IllegalStateException { + + if (backgroundBaseRunnableFuture != null && !backgroundBaseRunnableFuture.isDone()) { + throw new IllegalStateException("Unable to schedule background task when one is already running. All task must fully terminated before another can be scheduled. "); } - taskRunnable = runnable; - backgroundOamTask = scheduledExecutorService.scheduleWithFixedDelay( - runnable, initialDelay, interval, TimeUnit.MILLISECONDS); + this.cancelCallBackForBaseRunnable = cancelCallBack; - return backgroundOamTask; - } + backgroundBaseRunnableFuture = new MyFuture(runnable) { + /** + * augments the cancel operation to cancel all subTack too, + */ + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + boolean cancel; + synchronized (AsyncTaskHelper.this) { + cancel = super.cancel(mayInterruptIfRunning); + myFutureSet.stream().filter(f->!this.equals(f)).forEach(f->f.cancel(mayInterruptIfRunning)); + } + return cancel; + } - Future submitBundleLcOperation(final Callable callable) { - return bundleOperationService.submit(callable); + /** + * augments the isDone operation to return false until all subTask have completed too. + */ + @Override + public boolean isDone() { + synchronized (AsyncTaskHelper.this) { + return myFutureSet.isEmpty(); + } + } + }; + backgroundBaseRunnableFuture.setFuture( + scheduledExecutorService.scheduleWithFixedDelay( + backgroundBaseRunnableFuture, initialDelayMillis, delayMillis, TimeUnit.MILLISECONDS) + ); + return backgroundBaseRunnableFuture; } /** - * Cancle a previously schedule task. If the task is the same as backgroundOamTask, set it to null. - * @param task to be canceled + * Submits children {@link Callable} to be executed as soon as possible, A parent must have been scheduled + * previously via {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} + * @param callable the Callable to be submitted + * @return The {@link Callable}'s {@link Future} */ - public void cancelAsyncTask(Future task) { - task.cancel(false); - if (task == backgroundOamTask) { - backgroundOamTask = null; - taskRunnable = null; - logDebug("Cancelling background task in cancel task."); + synchronized Future submitBaseSubCallable(final Callable callable) { + + if (backgroundBaseRunnableFuture == null + || backgroundBaseRunnableFuture.isCancelled() + || backgroundBaseRunnableFuture.isDone()){ + throw new IllegalStateException("Unable to schedule subCallable when a base Runnable is not running."); + } + + //Make sure the pool is ready to go + if(bundleOperationService.getPoolSize() != bundleOperationService.getMaximumPoolSize()){ + bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize()); + bundleOperationService.prestartAllCoreThreads(); + bundleOperationService.setCorePoolSize(0); } + + MyFuture myFuture = new MyFuture(callable); + myFuture.setFuture(bundleOperationService.submit((Callable)myFuture)); + return myFuture; } /** @@ -173,4 +259,132 @@ public class AsyncTaskHelper { logger.debug(String.format(message, args)); } } + + /** + * This class has two purposes. First it insures {@link #isDone()} only returns true if the deligate is not + * currently running and will not be running in the future: See Java BUG JDK-8073704 Second this class maintains + * the {@link #myFutureSet } by insurring that itself is removed when {@link #isDone()} returns true. + * + * See {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and {@link #submitBaseSubCallable(Callable)} + * for usage of this class + */ + private class MyFuture implements Future, Runnable, Callable { + + private Future future; + private final Runnable runnable; + private final Callable callable; + private boolean isRunning; + + MyFuture(Runnable runnable) { + this.runnable = runnable; + this.callable = null; + myFutureSet.add(this); + } + + MyFuture(Callable callable) { + this.runnable = null; + this.callable = callable; + myFutureSet.add(this); + } + + void setFuture(Future future) { + this.future = future; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (AsyncTaskHelper.this) { + if (!isRunning) { + myFutureSetRemove(); + } + + return future.cancel(mayInterruptIfRunning); + } + } + + @Override + public boolean isCancelled() { + synchronized (AsyncTaskHelper.this) { + return future.isCancelled(); + } + } + + @Override + public boolean isDone() { + synchronized (AsyncTaskHelper.this) { + return future.isDone() && !isRunning; + } + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + + @Override + public void run() { + synchronized (AsyncTaskHelper.this) { + if(future.isCancelled()){ + return; + } + isRunning = true; + } + try { + runnable.run(); + } finally { + synchronized (AsyncTaskHelper.this) { + isRunning = false; + + //The Base Runnable is expected to run again. + //unless it has been canceled. + //so only removed if it is canceled. + if (future.isCancelled()) { + myFutureSetRemove(); + } + } + } + } + + @Override + public T call() throws Exception { + synchronized (AsyncTaskHelper.this) { + if(future.isCancelled()){ + throw new CancellationException(); + } + isRunning = true; + } + try { + return callable.call(); + } finally { + synchronized (AsyncTaskHelper.this){ + isRunning = false; + myFutureSetRemove(); + } + } + } + + + /** + * Removes this from the the myFutureSet. + * When all the BaseActionRunnable is Done notify any thread waiting in + * {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} + */ + void myFutureSetRemove(){ + synchronized (AsyncTaskHelper.this) { + myFutureSet.remove(this); + if(myFutureSet.isEmpty()){ + backgroundBaseRunnableFuture = null; + cancelCallBackForBaseRunnable = null; + AsyncTaskHelper.this.notifyAll(); + + } + } + } + + } } diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/BundleHelper.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/BundleHelper.java index 7fbb3c453..74159bd65 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/BundleHelper.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/BundleHelper.java @@ -28,6 +28,7 @@ import com.att.eelf.configuration.EELFLogger; import org.apache.commons.lang3.ArrayUtils; import org.openecomp.appc.exceptions.APPCException; import org.openecomp.appc.oam.AppcOam; +import org.openecomp.appc.oam.processor.BaseCommon; import org.openecomp.appc.statemachine.impl.readers.AppcOamStates; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; @@ -73,8 +74,9 @@ public class BundleHelper { */ public boolean bundleOperations(AppcOam.RPC rpc, Map> threads, - AsyncTaskHelper taskHelper) - throws APPCException { + AsyncTaskHelper taskHelper, + BaseCommon baseCommon) + throws APPCException { long mStartTime = System.currentTimeMillis(); logDebug(String.format("Entering OAM bundleOperations with rpc (%s).", rpc.name())); @@ -88,7 +90,6 @@ public class BundleHelper { boolean isBundleOperationComplete = true; Map appcLcmBundles = getAppcLcmBundles(); - taskHelper.addThreadsToPool(); for (Map.Entry bundleEntry : appcLcmBundles.entrySet()) { String bundleName = bundleEntry.getKey(); Bundle bundle = bundleEntry.getValue(); @@ -99,30 +100,29 @@ public class BundleHelper { // such as when a Stop request is receive while APPC is still trying to Start Up. if (!stateHelper.isSameState(originalState)) { logger.warn("OAM %s bundle operation aborted since OAM state is no longer %s!", - originalState.name()); + originalState.name()); isBundleOperationComplete = false; break; } } threads.put(bundleName, - taskHelper.submitBundleLcOperation(new BundleTask(rpc, bundle))); + taskHelper.submitBaseSubCallable(new BundleTask(rpc, bundle,baseCommon))); } - taskHelper.removeThreadsFromPoolWhenDone(); logDebug(String.format("Leaving OAM bundleOperations with rpc (%s) with complete(%s), elasped (%d) ms.", - rpc.name(), Boolean.toString(isBundleOperationComplete), getElaspeTimeMs(mStartTime))); + rpc.name(), Boolean.toString(isBundleOperationComplete), getElapseTimeMs(mStartTime))); return isBundleOperationComplete; } - private long getElaspeTimeMs(long mStartTime) { + private long getElapseTimeMs(long mStartTime) { return System.currentTimeMillis() - mStartTime; } /** * Check if all BundleTasks are completed - * @param bundleNameFutureMap with bundler name and BundleTask Future object + * @param bundleNameFutureMap with bundle name and BundleTask Future object * @return true if all are done, otherwise, false */ public boolean isAllTaskDone(Map> bundleNameFutureMap) { @@ -131,20 +131,22 @@ public class BundleHelper { } /** - * Cancel BunldeTasks which are not finished - * @param bundleNameFutureMap with bundler name and BundleTask Future object + * Cancel BundleTasks which are not finished + * @param bundleNameFutureMap with bundle name and BundleTask Future object */ public void cancelUnfinished(Map> bundleNameFutureMap) { - bundleNameFutureMap.values().stream().filter((f) -> !f.isDone()).forEach((f) -> f.cancel(true)); + bundleNameFutureMap.values().stream().filter((f) + -> !f.isDone()).forEach((f) + -> f.cancel(true)); } /** * Get number of failed BundleTasks - * @param bundleNameFurtureMap with bundler name and BundleTask Future object + * @param bundleNameFutureMap with bundle name and BundleTask Future object * @return number(long) of the failed BundleTasks */ - public long getFailedMetrics(Map> bundleNameFurtureMap) { - return bundleNameFurtureMap.values().stream().map((f) -> { + public long getFailedMetrics(Map> bundleNameFutureMap) { + return bundleNameFutureMap.values().stream().map((f) -> { try { return f.get(); } catch (Exception e) { @@ -168,10 +170,10 @@ public class BundleHelper { BundleFilter bundleList = new BundleFilter(bundlesToStop, regExBundleNotStop, getBundleList()); logger.info(String.format("(%d) APPC bundles to Stop/Start: %s.", bundleList.getBundlesToStop().size(), - bundleList.getBundlesToStop().toString())); + bundleList.getBundlesToStop().toString())); logger.debug(String.format("(%d) APPC bundles that won't be Stopped/Started: %s.", - bundleList.getBundlesToNotStop().size(), bundleList.getBundlesToNotStop().toString())); + bundleList.getBundlesToNotStop().size(), bundleList.getBundlesToNotStop().toString())); return bundleList.getBundlesToStop(); } @@ -229,17 +231,21 @@ public class BundleHelper { private Bundle bundle; private String bundleName; private String actionName; + private final BaseCommon baseCommon; - BundleTask(AppcOam.RPC rpcIn, Bundle bundleIn) { + BundleTask(AppcOam.RPC rpcIn, Bundle bundleIn, BaseCommon baseCommon) { rpc = rpcIn; actionName = rpc.getAppcOperation().toString(); bundle = bundleIn; bundleName = bundle.getSymbolicName(); + this.baseCommon = baseCommon; } @Override public BundleTask call() throws Exception { try { + baseCommon.setInitialLogProperties(); + long bundleOperStartTime = System.currentTimeMillis(); logDebug(String.format("OAM %s bundle %s ===>", actionName, bundleName)); switch (rpc) { @@ -253,12 +259,15 @@ public class BundleHelper { // should do nothing } logDebug(String.format("OAM %s bundle %s completed <=== elasped %d", - actionName, bundleName, getElaspeTimeMs(bundleOperStartTime))); + actionName, bundleName, getElapseTimeMs(bundleOperStartTime))); } catch (BundleException e) { logger.error(String.format("Exception encountered when OAM %s bundle %s ", - actionName, bundleName), e); + actionName, bundleName), e); failException = e; } + finally { + baseCommon.clearRequestLogProperties(); + } return this; } } diff --git a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/ConfigurationHelper.java b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/ConfigurationHelper.java index c465b9b10..6e6ab036b 100644 --- a/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/ConfigurationHelper.java +++ b/appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/ConfigurationHelper.java @@ -30,12 +30,17 @@ import org.openecomp.appc.Constants; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; +import java.util.concurrent.TimeUnit; + /** * Utility class provides general configuration helps */ public class ConfigurationHelper { final static String PROP_KEY_APPC_NAME = Constants.PROPERTY_APPLICATION_NAME; final static String PROP_KEY_METRIC_STATE = "metric.enabled"; + private final String OAM_OPERATION_TIMEOUT_SECOND = "appc.OAM.api.timeout"; + /** Default operation timeout set to 1 minute */ + private final int DEFAULT_OAM_OPERATION_TIMEOUT = 60; private final EELFLogger logger; private Configuration configuration = ConfigurationFactory.getConfiguration(); @@ -57,7 +62,7 @@ public class ConfigurationHelper { } /** - * Read property value of a specified proeprty key + * Read property value of a specified property key * * @param propertyKey string of the property key * @return String[] of the property values associated with the propertyKey @@ -77,4 +82,23 @@ public class ConfigurationHelper { } return new String[]{propertyValue}; } + + + + + + /** + * This method returns timeout in milliseconds. The source is chosen in the following order: + * The overrideTimeoutSeconds argument + * or {@link #OAM_OPERATION_TIMEOUT_SECOND} found in the configuration file + * or the {@link #DEFAULT_OAM_OPERATION_TIMEOUT} + * @param overrideTimeoutSeconds or null to us the other sources + * @return timeout in milliseconds + */ + public long getOAMOperationTimeoutValue(Integer overrideTimeoutSeconds) { + return overrideTimeoutSeconds == null ? + getConfig().getIntegerProperty(OAM_OPERATION_TIMEOUT_SECOND, DEFAULT_OAM_OPERATION_TIMEOUT) * 1000 + : + TimeUnit.MILLISECONDS.toMillis(overrideTimeoutSeconds); + } } diff --git a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseActionRunnableTest.java b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseActionRunnableTest.java index c5ad95e43..07500f441 100644 --- a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseActionRunnableTest.java +++ b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseActionRunnableTest.java @@ -22,7 +22,6 @@ * ============LICENSE_END========================================================= */ - package org.openecomp.appc.oam.processor; import com.att.eelf.configuration.EELFLogger; @@ -31,7 +30,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.common.header.CommonHeader; -import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.i18n.Msg; import org.openecomp.appc.oam.AppcOam; import org.openecomp.appc.oam.OAMCommandStatus; @@ -46,7 +44,6 @@ import org.powermock.reflect.Whitebox; import java.util.Date; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -102,7 +99,6 @@ public class BaseActionRunnableTest { private StateHelper mockStateHelper = mock(StateHelper.class); private OperationHelper mockOperHelper = mock(OperationHelper.class); private ConfigurationHelper mockConfigHelper = mock(ConfigurationHelper.class); - private Configuration mockConfig = mock(Configuration.class); private BundleHelper mockBundleHelper = mock(BundleHelper.class); @SuppressWarnings("ResultOfMethodCallIgnored") @@ -111,11 +107,8 @@ public class BaseActionRunnableTest { // to avoid operation on logger fail, mock up the logger EELFLogger mockLogger = mock(EELFLogger.class); - Mockito.doReturn(mockConfig).when(mockConfigHelper).getConfig(); - Mockito.doReturn(10).when(mockConfig).getIntegerProperty(any(), anyInt()); - testProcessor = spy( - new TestProcessor(mockLogger, mockConfigHelper, mockStateHelper, null, mockOperHelper)); + new TestProcessor(mockLogger, mockConfigHelper, mockStateHelper, null, mockOperHelper)); Whitebox.setInternalState(testProcessor, "bundleHelper", mockBundleHelper); testBaseAcionRunnable = spy(new TestAbc(testProcessor)); @@ -127,41 +120,41 @@ public class BaseActionRunnableTest { Whitebox.setInternalState(testBaseAcionRunnable, "timeoutMs", 0); Whitebox.setInternalState(testBaseAcionRunnable, "startTimeMs", 0); Whitebox.setInternalState(testBaseAcionRunnable, "doTimeoutChecking", false); + long expectedTimeout = 10000L; + Mockito.doReturn(expectedTimeout).when(mockConfigHelper).getOAMOperationTimeoutValue(any()); testBaseAcionRunnable.setTimeoutValues(); - Assert.assertEquals("Should set timeoutMs", 10 * 1000, testBaseAcionRunnable.timeoutMs); + Assert.assertEquals("Should set timeoutMs", expectedTimeout, testBaseAcionRunnable.timeoutMs); Assert.assertTrue("Should set start time MS", testBaseAcionRunnable.startTimeMs != 0); Assert.assertTrue("Should do check", testBaseAcionRunnable.doTimeoutChecking); Whitebox.setInternalState(testBaseAcionRunnable, "timeoutMs", 0); Whitebox.setInternalState(testBaseAcionRunnable, "startTimeMs", 0); Whitebox.setInternalState(testBaseAcionRunnable, "doTimeoutChecking", false); - int timeoutSeconds = 20; - Whitebox.setInternalState(testProcessor, "timeoutSeconds", timeoutSeconds); + expectedTimeout = 20000L; + Mockito.doReturn(expectedTimeout).when(mockConfigHelper).getOAMOperationTimeoutValue(any()); testBaseAcionRunnable.setTimeoutValues(); - Assert.assertEquals("Should set timeoutMs", timeoutSeconds * 1000, testBaseAcionRunnable.timeoutMs); + Assert.assertEquals("Should set timeoutMs", expectedTimeout, testBaseAcionRunnable.timeoutMs); Assert.assertTrue("Should set start time MS", testBaseAcionRunnable.startTimeMs != 0); Assert.assertTrue("Should do check", testBaseAcionRunnable.doTimeoutChecking); Whitebox.setInternalState(testBaseAcionRunnable, "timeoutMs", 0); Whitebox.setInternalState(testBaseAcionRunnable, "startTimeMs", 0); Whitebox.setInternalState(testBaseAcionRunnable, "doTimeoutChecking", false); - - timeoutSeconds = 0; - Whitebox.setInternalState(testProcessor, "timeoutSeconds", timeoutSeconds); - Mockito.doReturn(0).when(mockConfig).getIntegerProperty( - testBaseAcionRunnable.OAM_OPERATION_TIMEOUT_SECOND, testBaseAcionRunnable.DEFAULT_OAM_OPERATION_TIMEOUT); + expectedTimeout = 0L; + Mockito.doReturn(expectedTimeout).when(mockConfigHelper).getOAMOperationTimeoutValue(any()); testBaseAcionRunnable.setTimeoutValues(); - Assert.assertEquals("Should set timeoutMs", timeoutSeconds * 1000, testBaseAcionRunnable.timeoutMs); + Assert.assertEquals("Should set timeoutMs", expectedTimeout, testBaseAcionRunnable.timeoutMs); Assert.assertTrue("Should not set start time MS", testBaseAcionRunnable.startTimeMs == 0); Assert.assertFalse("Should not do check", testBaseAcionRunnable.doTimeoutChecking); } + @Test public void testRun() throws Exception { // test doAction failed Whitebox.setInternalState(testBaseAcionRunnable, "doActionResult", false); testBaseAcionRunnable.run(); Assert.assertFalse("isWaiting should still be false", - Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting")); + Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting")); // test doAction success Whitebox.setInternalState(testBaseAcionRunnable, "doActionResult", true); @@ -170,13 +163,13 @@ public class BaseActionRunnableTest { Mockito.doReturn(true).when(testBaseAcionRunnable).checkState(); testBaseAcionRunnable.run(); Assert.assertFalse("isWaiting should still be false", - Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting")); + Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting")); // with checkState return false Mockito.doReturn(false).when(testBaseAcionRunnable).checkState(); testBaseAcionRunnable.run(); Assert.assertTrue("isWaiting should still be true", - Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting")); + Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting")); // should stay testBaseAcionRunnable.run(); @@ -187,11 +180,11 @@ public class BaseActionRunnableTest { public void testSetAbortStatus() throws Exception { testBaseAcionRunnable.setAbortStatus(); Assert.assertEquals("Should return abort code", OAMCommandStatus.ABORT.getResponseCode(), - testBaseAcionRunnable.status.getCode().intValue()); + testBaseAcionRunnable.status.getCode().intValue()); Assert.assertTrue("Should set abort due to execution error message", - testBaseAcionRunnable.status.getMessage().endsWith( - String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT, - testRpc.name(), testBaseAcionRunnable.DUE_TO_EXECUTION_ERROR))); + testBaseAcionRunnable.status.getMessage().endsWith( + String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT, + testRpc.name(), testBaseAcionRunnable.DUE_TO_EXECUTION_ERROR))); } @Test @@ -261,10 +254,10 @@ public class BaseActionRunnableTest { Assert.assertTrue("Should be timeout", testBaseAcionRunnable.isTimeout(parentName)); Mockito.verify(testBaseAcionRunnable, times(1)).postAction(any()); Assert.assertEquals("Should return timeout code", OAMCommandStatus.TIMEOUT.getResponseCode(), - testBaseAcionRunnable.status.getCode().intValue()); + testBaseAcionRunnable.status.getCode().intValue()); Assert.assertTrue("Should set timeout message", - testBaseAcionRunnable.status.getMessage().endsWith( - String.format(testBaseAcionRunnable.TIMEOUT_MESSAGE_FORMAT, testRpc.name(), timeoutMs))); + testBaseAcionRunnable.status.getMessage().endsWith( + String.format(testBaseAcionRunnable.TIMEOUT_MESSAGE_FORMAT, testRpc.name(), timeoutMs))); } @SuppressWarnings("unchecked") @@ -277,9 +270,8 @@ public class BaseActionRunnableTest { long failedNumber = 1; Mockito.doReturn(failedNumber).when(mockBundleHelper).getFailedMetrics(anyMap()); Assert.assertTrue("should return true", testBaseAcionRunnable.hasBundleOperationFailure()); - Mockito.verify(testBaseAcionRunnable, - times(1)).setStatus(OAMCommandStatus.UNEXPECTED_ERROR, - String.format(testBaseAcionRunnable.BUNDLE_OPERATION_FAILED_FORMAT, failedNumber)); + Mockito.verify(testBaseAcionRunnable, times(1)).setStatus(OAMCommandStatus.UNEXPECTED_ERROR, + String.format(testBaseAcionRunnable.BUNDLE_OPERATION_FAILED_FORMAT, failedNumber)); Mockito.verify(testBaseAcionRunnable, times(1)).postAction(AppcOamStates.Error); } @@ -289,11 +281,11 @@ public class BaseActionRunnableTest { AppcOam.RPC newRpc = AppcOam.RPC.restart; testBaseAcionRunnable.abortRunnable(newRpc); Assert.assertEquals("Should return abort code", OAMCommandStatus.ABORT.getResponseCode(), - testBaseAcionRunnable.status.getCode().intValue()); + testBaseAcionRunnable.status.getCode().intValue()); Assert.assertTrue("Should set abort due to new request message", - testBaseAcionRunnable.status.getMessage().endsWith( - String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT, testRpc.name(), - String.format(testBaseAcionRunnable.NEW_RPC_OPERATION_REQUEST, newRpc.name())))); + testBaseAcionRunnable.status.getMessage().endsWith( + String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT, testRpc.name(), + String.format(testBaseAcionRunnable.NEW_RPC_OPERATION_REQUEST, newRpc.name())))); Mockito.verify(mockOperHelper, times(1)).sendNotificationMessage(any(), any(), any()); Mockito.verify(testBaseAcionRunnable, times(1)).resetLogProperties(false); Mockito.verify(testBaseAcionRunnable, times(1)).resetLogProperties(true); diff --git a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseCommonTest.java b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseCommonTest.java index 3a9e76f27..3f51669f7 100644 --- a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseCommonTest.java +++ b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseCommonTest.java @@ -163,11 +163,20 @@ public class BaseCommonTest { testBaseCommon.setInitialLogProperties(); testBaseCommon.resetLogProperties(false); - Mockito.verify(testBaseCommon, times(2)).setInitialLogProperties(); + Mockito.verify(mockCommonHeader, times(2)).getRequestId(); + Mockito.verify(mockCommonHeader, times(2)).getOriginatorId(); Map oldMdcMap = Whitebox.getInternalState(testBaseCommon, "oldMdcContent"); Assert.assertTrue("Should have 5 entries in persisted map", oldMdcMap.size() == 5); testBaseCommon.resetLogProperties(false); - Mockito.verify(testBaseCommon, times(3)).setInitialLogProperties(); + Mockito.verify(mockCommonHeader, times(3)).getRequestId(); + Mockito.verify(mockCommonHeader, times(3)).getOriginatorId(); + + // test oldMdcMap is cleared + testBaseCommon.resetLogProperties(false); + Mockito.verify(mockCommonHeader, times(4)).getRequestId(); + Mockito.verify(mockCommonHeader, times(4)).getOriginatorId(); + oldMdcMap = Whitebox.getInternalState(testBaseCommon, "oldMdcContent"); + Assert.assertTrue("Should have 5 entries in persisted map", oldMdcMap.size() == 5); } } diff --git a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseProcessorTest.java b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseProcessorTest.java index 354053c8e..0109adf36 100644 --- a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseProcessorTest.java +++ b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseProcessorTest.java @@ -22,7 +22,6 @@ * ============LICENSE_END========================================================= */ - package org.openecomp.appc.oam.processor; import com.att.eelf.configuration.EELFLogger; @@ -46,8 +45,6 @@ import org.openecomp.appc.oam.util.StateHelper; import org.openecomp.appc.statemachine.impl.readers.AppcOamStates; import org.powermock.reflect.Whitebox; -import java.util.concurrent.Future; - import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -97,7 +94,7 @@ public class BaseProcessorTest { Mockito.doReturn(mockCommonHeader).when(mockInput).getCommonHeader(); testBaseProcessor = spy( - new TestAbc(null, mockConfigHelper, mockStateHelper, mockTaskHelper, mockOperHelper)); + new TestAbc(null, mockConfigHelper, mockStateHelper, mockTaskHelper, mockOperHelper)); Whitebox.setInternalState(testBaseProcessor, "commonHeader", mockCommonHeader); @@ -112,7 +109,7 @@ public class BaseProcessorTest { Mockito.doThrow(new InvalidInputException("test")).when(mockOperHelper).isInputValid(mockInput); Status status = testBaseProcessor.processRequest(mockInput); Assert.assertEquals("Should return reject", - OAMCommandStatus.INVALID_PARAMETER.getResponseCode(), status.getCode().intValue()); + OAMCommandStatus.INVALID_PARAMETER.getResponseCode(), status.getCode().intValue()); } @Test @@ -122,7 +119,7 @@ public class BaseProcessorTest { Mockito.doReturn(mockCommonHeader).when(mockInput).getCommonHeader(); Status status = testBaseProcessor.processRequest(mockInput); Assert.assertEquals("Should return success", - OAMCommandStatus.ACCEPTED.getResponseCode(), status.getCode().intValue()); + OAMCommandStatus.ACCEPTED.getResponseCode(), status.getCode().intValue()); } @Test(expected = InvalidInputException.class) @@ -135,7 +132,7 @@ public class BaseProcessorTest { public void testPreProcessWithInvalidState() throws Exception { Mockito.doReturn(currentState).when(mockStateHelper).getCurrentOamState(); Mockito.doThrow(new InvalidStateException("test")) - .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState); + .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState); testBaseProcessor.preProcess(mockInput); } @@ -143,7 +140,7 @@ public class BaseProcessorTest { public void testPreProcessWithAppcException() throws Exception { Mockito.doReturn(currentState).when(mockStateHelper).getCurrentOamState(); Mockito.doThrow(new APPCException("test")) - .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState); + .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState); testBaseProcessor.preProcess(mockInput); } @@ -152,7 +149,7 @@ public class BaseProcessorTest { Mockito.doReturn(currentState).when(mockStateHelper).getCurrentOamState(); AppcOamStates nextState = AppcOamStates.Starting; Mockito.doReturn(nextState) - .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState); + .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState); testBaseProcessor.preProcess(mockInput); Mockito.verify(mockOperHelper, times(1)).isInputValid(mockInput); Mockito.verify(mockOperHelper, times(1)).getNextState(testRpc.getAppcOperation(), currentState); @@ -163,32 +160,14 @@ public class BaseProcessorTest { public void testScheduleAsyncTask() throws Exception { // test no runnable testBaseProcessor.scheduleAsyncTask(); - Mockito.verify(mockTaskHelper, times(0)).scheduleAsyncTask(any(), any()); + Assert.assertTrue(Whitebox.getInternalState(testBaseProcessor, "runnable") == null); + Assert.assertTrue(Whitebox.getInternalState(testBaseProcessor, "scheduledRunnable") == null); BaseActionRunnable mockRunnable = mock(BaseActionRunnable.class); Whitebox.setInternalState(testBaseProcessor, "runnable", mockRunnable); testBaseProcessor.scheduleAsyncTask(); - Mockito.verify(mockTaskHelper, times(1)).scheduleAsyncTask(testRpc, mockRunnable); - } - - @Test - public void isSameAsyncTask() throws Exception { - Future mockTask1 = mock(Future.class); - Whitebox.setInternalState(testBaseProcessor, "scheduledRunnable", mockTask1); - Mockito.doReturn(mockTask1).when(mockTaskHelper).getCurrentAsyncTask(); - Assert.assertTrue("Shoudl be the same", testBaseProcessor.isSameAsyncTask()); - - Future mockTask2 = mock(Future.class); - Mockito.doReturn(mockTask2).when(mockTaskHelper).getCurrentAsyncTask(); - Assert.assertFalse("Shoudl not be the same", testBaseProcessor.isSameAsyncTask()); - } - - @Test - public void cancleAsyncTask() throws Exception { - Future mockTask = mock(Future.class); - Whitebox.setInternalState(testBaseProcessor, "scheduledRunnable", mockTask); - testBaseProcessor.cancelAsyncTask(); - Mockito.verify(mockTaskHelper, times(1)).cancelAsyncTask(mockTask); + // scheduledRunnable should still be null, there's no mock done + // as I have trouble to make mockTaskHelper.scheduleBaseRunnable to return a proper Future Assert.assertTrue(Whitebox.getInternalState(testBaseProcessor, "scheduledRunnable") == null); } diff --git a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/AsyncTaskHelperTest.java b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/AsyncTaskHelperTest.java index 873b21795..f81938e30 100644 --- a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/AsyncTaskHelperTest.java +++ b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/AsyncTaskHelperTest.java @@ -25,107 +25,641 @@ package org.openecomp.appc.oam.util; import com.att.eelf.configuration.EELFLogger; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Mockito; import org.openecomp.appc.oam.AppcOam; -import org.openecomp.appc.oam.processor.BaseActionRunnable; -import org.powermock.reflect.Whitebox; +import org.openecomp.appc.statemachine.impl.readers.AppcOamStates; +import org.osgi.framework.Bundle; +import org.osgi.framework.FrameworkUtil; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; -import java.util.Arrays; +import java.util.LinkedList; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.powermock.api.mockito.PowerMockito.mockStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({FrameworkUtil.class}) public class AsyncTaskHelperTest { private AsyncTaskHelper asyncTaskHelper; - private ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class); - private BaseActionRunnable mockRunnable = mock(BaseActionRunnable.class); + + private long initialDelayMillis = 0; + private long delayMillis = 10; + @Before public void setUp() throws Exception { - asyncTaskHelper = new AsyncTaskHelper(null); - Whitebox.setInternalState(asyncTaskHelper, "scheduledExecutorService", mockScheduler); // to avoid operation on logger fail, mock up the logger EELFLogger mockLogger = mock(EELFLogger.class); - Whitebox.setInternalState(asyncTaskHelper, "logger", mockLogger); + + + mockStatic(FrameworkUtil.class); + Bundle myBundle = mock(Bundle.class); + Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName(); + PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle); + + asyncTaskHelper = new AsyncTaskHelper(mockLogger); + + } - @Test - public void testClose() throws Exception { + + @After + public void shutdown(){ asyncTaskHelper.close(); - Mockito.verify(mockScheduler, times(1)).shutdown(); } + + /** + * Test that Base Runnable + * + * Runs at a fix rate; + * Only one Base Runnable can be scheduled at time; + * Future.cancle stops the Base Runnable; + * That another Base Runnable can be scheduled once the previous isDone. + */ + @Test + public void test_scheduleBaseRunnable_Base_isDone() throws Exception{ + + + + //loop is to test we can run consecutive Base Runnable + for(int testIteration = 0; testIteration < 3;testIteration++){ + final ExecuteTest et = new ExecuteTest(); + + Future future = asyncTaskHelper.scheduleBaseRunnable( + et::test + , s -> { } + ,initialDelayMillis + ,delayMillis + ); + + //make sure it is running at a fix rate + Assert.assertTrue("It should be iterating", et.waitForTestExec(5000)); + Assert.assertFalse("It Should not be Done", future.isDone()); + Assert.assertTrue("It should be iterating", et.waitForTestExec(5000)); + Assert.assertFalse("It Should not be Done", future.isDone()); + + + //make sure a seconds Runnable cannot be scheduled when one is already running + try { + asyncTaskHelper.scheduleBaseRunnable(et::test + , s -> {} + ,initialDelayMillis + ,delayMillis + ); + Assert.fail("scheduling should have been prevented. "); + } catch (IllegalStateException e) { + //IllegalStateException means the second scheduling was not allowed. + } + + + //let it cancel itself + et.cancelSelfOnNextExecution(future); + + //it should be done after it executes itself one more time. + Assert.assertTrue("it should be done", waitFor(future::isDone, 5000)); + Assert.assertTrue("The test failed to execute", et.isExecuted); + } + + + } + + + /** + * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not + * execute in the future. Default implementation of isDone() returns true immediately after the future is + * canceled -- Even if is there is still a thread actively executing the runnable + */ @Test - public void testGetCurrentAsyncTask() throws Exception { - Future mockTask = mock(Future.class); - Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask); - Assert.assertEquals("Should return mock task", mockTask, asyncTaskHelper.getCurrentAsyncTask()); + public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{ + + + final ExecuteTest et = new ExecuteTest(); + + //configure test to run long and ignore interrupt + et.isContinuous = true; + et.isIgnoreInterrupt = true; + + + + Future future = asyncTaskHelper.scheduleBaseRunnable( + et::test + , s->{} + ,initialDelayMillis + ,delayMillis + ); + + //make sure it is running + Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000)); + Assert.assertTrue("It should be running",et.waitForTestExec(1000)); + Assert.assertFalse("It Should not be Done", future.isDone()); + + //cancel it and make sure it is still running + future.cancel(true); + Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000)); + Assert.assertTrue("It should be running",et.waitForTestExec(1000)); + Assert.assertFalse("It Should not be Done", future.isDone()); + + //let the thread die and then make sure its done + et.isContinuous = false; + Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000)); + Assert.assertTrue("It Should be Done", future.isDone()); + } + + + + /** + * Make sure the base Future.isDone returns false until the sub callable has completed execution. + */ @Test - public void testScheduleAsyncTaskWithMmod() throws Exception { - // test maintenance mode - ScheduledFuture mockTask0 = mock(ScheduledFuture.class); - Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask0); + public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{ + + + final ExecuteTest baseET = new ExecuteTest(); + final ExecuteTest subET = new ExecuteTest(); + + //configure sub test to run long and ignore interrupt + subET.isContinuous = true; + subET.isIgnoreInterrupt = true; + + + //schedule the Base test to run and make sure it is running. + Future baseFuture = asyncTaskHelper.scheduleBaseRunnable( + baseET::test + ,s->{} + ,initialDelayMillis + ,delayMillis + ); + Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000)); + Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone()); + + + //schedule the sub task and make sure it is running + Future subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test); + Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000)); + Assert.assertTrue("subET should be running",subET.waitForTestExec(1000)); + Assert.assertFalse("subET Should not be Done", subFuture.isDone()); + Assert.assertFalse("baseET Should not be Done", baseFuture.isDone()); + + //cancel the base task and make sure isDone is still false + baseFuture.cancel(true); + Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000)); + Assert.assertTrue("subET should be running",subET.waitForTestExec(1000)); + Assert.assertFalse("subET Should not be Done",subFuture.isDone()); + Assert.assertFalse("baseET Should not be Done", baseFuture.isDone()); + + + //let the sub task die and and make sure the base is now finally done + subET.isContinuous = false; + Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000)); + Assert.assertTrue("subET Should be Done", subFuture.isDone()); + Assert.assertTrue("baseET Should be Done", baseFuture.isDone()); - ScheduledFuture mockTask1 = mock(ScheduledFuture.class); - Mockito.doReturn(mockTask1).when(mockScheduler).scheduleWithFixedDelay( - mockRunnable, asyncTaskHelper.MMODE_TASK_DELAY, - asyncTaskHelper.MMODE_TASK_DELAY, TimeUnit.MILLISECONDS); - asyncTaskHelper.scheduleAsyncTask(AppcOam.RPC.maintenance_mode, mockRunnable); - Mockito.verify(mockTask0, times(1)).cancel(true); - Assert.assertEquals(mockTask1, asyncTaskHelper.scheduleAsyncTask(AppcOam.RPC.maintenance_mode, mockRunnable)); - Assert.assertEquals("Should set backgroundOamTask", mockTask1, asyncTaskHelper.getCurrentAsyncTask()); } + + /** + * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution. + * Each sub callable will be shutdown one at a time. + */ @Test - public void testScheduleAsyncTaskWithStart() throws Exception { - for (AppcOam.RPC rpc : Arrays.asList(AppcOam.RPC.start, AppcOam.RPC.stop, AppcOam.RPC.restart)) { - runTest(rpc); + public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception { + + + //loop is to test we can run consecutive Base Runnable + for (int testIteration = 0; testIteration < 3; testIteration++) { + final ExecuteTest baseET = new ExecuteTest(); + final LinkedList subList = new LinkedList<>(); + for (int i = 0; i < 3; i++) { + Sub sub = new Sub(); + sub.et.isContinuous = true; + subList.add(sub); + } + + + //schedule the base runnable and make sure it is running + Future baseFuture = asyncTaskHelper.scheduleBaseRunnable( + baseET::test + , s -> { + } + , initialDelayMillis + , delayMillis + ); + Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000)); + Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone()); + + + //schedule the sub Callables and make sure these are running + subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test)); + for (Sub sub : subList) { + Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100)); + Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000)); + Assert.assertFalse("subET Should not be Done", sub.future.isDone()); + } + Assert.assertFalse("baseET Should not be Done", baseFuture.isDone()); + + + //On each iteration shut down a sub callable. Make sure it stops, the others are still running and the + // //base is still running. + while (!subList.isEmpty()) { + + //stop one sub and make sure it stopped + { + Sub sub = subList.removeFirst(); + Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000)); + sub.et.isContinuous = false; + Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000)); + Assert.assertTrue("subET Should not be Done", sub.future.isDone()); + } + + //make sure the other are still running + for (Sub sub : subList) { + Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000)); + Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000)); + Assert.assertFalse("subET Should not be Done", sub.future.isDone()); + } + + //Make sure the Base is still running + Assert.assertFalse("baseET Should not be Done", baseFuture.isDone()); + } + + //let the base cancel itself and make sure it stops + baseET.cancelSelfOnNextExecution(baseFuture); + Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000)); + } + } + + + /** + * Make sure SubCallable cannot be scheduled when there is not BaseRunnable + */ + @Test(expected=IllegalStateException.class) + public void test_SubTasksScheduleFailWhenNoBase() throws Exception { + asyncTaskHelper.submitBaseSubCallable(()->null); + } + + + + /** + * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running. + */ + @Test(expected=IllegalStateException.class) + public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception { + + final ExecuteTest et = new ExecuteTest(); + et.isContinuous = true; + + Future future = asyncTaskHelper.scheduleBaseRunnable( + et::test + , s -> { } + ,initialDelayMillis + ,delayMillis + ); + + Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000)); + future.cancel(false); + Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000)); + + try { + asyncTaskHelper.submitBaseSubCallable(() -> null); + } finally { + et.isContinuous = false; } + + + } - private void runTest(AppcOam.RPC rpc) { - ScheduledFuture mockTask0 = mock(ScheduledFuture.class); - Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask0); - BaseActionRunnable mockRunnable0 = mock(BaseActionRunnable.class); - Whitebox.setInternalState(asyncTaskHelper, "taskRunnable", mockRunnable0); - ScheduledFuture mockTask2 = mock(ScheduledFuture.class); - Mockito.doReturn(mockTask2).when(mockScheduler).scheduleWithFixedDelay( - mockRunnable, asyncTaskHelper.COMMON_INITIAL_DELAY, - asyncTaskHelper.COMMON_INTERVAL, TimeUnit.MILLISECONDS); - asyncTaskHelper.scheduleAsyncTask(rpc, mockRunnable); - Mockito.verify(mockTask0, times(1)).cancel(true); - Mockito.verify(mockRunnable0, times(1)).abortRunnable(rpc); - Assert.assertEquals(mockTask2, asyncTaskHelper.scheduleAsyncTask(rpc, mockRunnable)); - Assert.assertEquals("Should set backgroundOamTask", mockTask2, asyncTaskHelper.getCurrentAsyncTask()); + /** + * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed + */ + @Test(expected=IllegalStateException.class) + public void test_SubTasksScheduleFailAfterBaseDone() throws Exception { + + final ExecuteTest et = new ExecuteTest(); + + Future future = asyncTaskHelper.scheduleBaseRunnable( + et::test + , s -> { } + ,initialDelayMillis + ,delayMillis + ); + + + future.cancel(false); + Assert.assertTrue("It should not be running",waitFor(future::isDone,1000)); + + try { + asyncTaskHelper.submitBaseSubCallable(() -> null); + } finally { + et.isContinuous = false; + } + } + + /** + * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}} + * Test cancel does not block when BaseRunnable is not scheduled + */ @Test - public void testCancelAsyncTask() throws Exception { - Future mockTask = mock(Future.class); - Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask); - asyncTaskHelper.cancelAsyncTask(mockTask); - Mockito.verify(mockTask, times(1)).cancel(false); - Assert.assertTrue("Should have reset backgroundOamTask", - asyncTaskHelper.getCurrentAsyncTask() == null); - - - Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask); - Future mockTask2 = mock(Future.class); - asyncTaskHelper.cancelAsyncTask(mockTask2); - Mockito.verify(mockTask2, times(1)).cancel(false); - Assert.assertEquals("Should not reset backgroundOamTask", - mockTask, asyncTaskHelper.getCurrentAsyncTask()); + public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{ + //nothing is running so this should return immediately without TimeoutException + asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS); + } + + + + /** + * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}} + * Test cancel does blocks until BaseRunnable is done scheduled + */ + @Test() + public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception { + + + final ExecuteTest et = new ExecuteTest(); + et.isContinuous = true; + et.isIgnoreInterrupt = true; + asyncTaskHelper.scheduleBaseRunnable( + et::test + , s -> { + } + , initialDelayMillis + , delayMillis + ); + + Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000)); + + + //we should get a timeout + try { + asyncTaskHelper.cancelBaseActionRunnable( + AppcOam.RPC.stop, + AppcOamStates.Started, + 1, + TimeUnit.MILLISECONDS); + Assert.fail("Should have gotten TimeoutException"); + } catch (TimeoutException e) { + //just ignore as it is expected + } + + + //release the test thread + et.isContinuous = false; + + + //we should not get a timeout + asyncTaskHelper.cancelBaseActionRunnable( + AppcOam.RPC.stop, + AppcOamStates.Started, + 1000, + TimeUnit.MILLISECONDS); + + } + + + + /** + * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}} + * Test cancel does not block when BaseRunnable is not scheduled + */ + @Test + public void test_BaseRunnableCancelCallback() throws Exception{ + + AtomicReference cancelCallback = new AtomicReference<>(null); + + final ExecuteTest et = new ExecuteTest(); + et.isContinuous = true; + Future future = asyncTaskHelper.scheduleBaseRunnable( + et::test + , cancelCallback::set + , initialDelayMillis + , delayMillis + ); + + Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000)); + Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000)); + + + try { + asyncTaskHelper.cancelBaseActionRunnable( + AppcOam.RPC.stop, + AppcOamStates.Started, + 1, + TimeUnit.MILLISECONDS); + Assert.fail("Should have gotten TimeoutException"); + } catch (TimeoutException e) { + //just ignore as it is expected + } + + + Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get()); + } + + + + + + + + + /** + * @return true if the negation of the expected value is returned from the supplier within the specified + * amount of time + */ + private static boolean waitForNot(Supplier s,long timeoutMillis)throws Exception{ + return waitFor(()->!s.get(),timeoutMillis); + } + + + /** + * @return true if the expected value is returned from the supplier within the specified + * amount of time + */ + private static boolean waitFor(Supplier s,long timeoutMillis) throws Exception { + long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis); + long expiryTime = System.currentTimeMillis() + timeout; + long elapsedTime; + while(!s.get()){ + elapsedTime = expiryTime - System.currentTimeMillis(); + if(elapsedTime < 1) { + break; + } + Thread.sleep(10); + } + return s.get(); + } + + + /** + * This class is used control a thread executed in th {@link #test()} + */ + @SuppressWarnings("unused") + private static class ExecuteTest { + + + /** A fail safe to insure this TEst does not run indefinitely */ + private final long EXPIRY_TIME = System.currentTimeMillis() + 10000; + + + + /** A thread sets this value to true when it has completed the execution the of executes {@link #test()} */ + private volatile boolean isExecuted = false; + + /** + * A thread sets this value to true when it is actively executing {@link #test()} and back to false when + * it is not + */ + private volatile boolean isExecuting = false; + + /** + * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate a + * long execution. + */ + private volatile boolean isContinuous = false; + + /** + * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force + * to abort via a {@link Thread#interrupt()} + */ + private volatile boolean isIgnoreInterrupt = false; + + + + /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */ + private Semaphore inner = new Semaphore(0); + + /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */ + private Semaphore outer = new Semaphore(0); + + /** The {@link Future} of the Thread executing {@link #test()}*/ + private volatile Future future; + + /** + * When set the Thread executing {@link #test()} will cancel itself + * @param future - The {@link Future} of the Thread executing {@link #test()} + */ + private void cancelSelfOnNextExecution(Future future) { + this.future = future; + } + + + private boolean isExecuted() { + return isExecuted; + } + + private boolean isExecuting() { + return isExecuting; + } + + + private boolean isContinuous() { + return isContinuous; + } + + + private boolean isIgnoreInterrupt() { + return isIgnoreInterrupt; + } + + + + /** + * The thread executing this method if blocked from returning until the thread executing + * {@link #test()} invokes {@link #notifyTestExcuted(long)} or the specified time elapses + * @param timeoutMillis - the amount of time to wait for a execution iteration. + * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)} + * @throws InterruptedException - If the Caller thread is interrupted. + */ + private boolean waitForTestExec(long timeoutMillis) throws InterruptedException { + inner.release(); + return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS); + } + + + /** + * Test simulator + * @return Always returns true. + */ + private Boolean test() { + isTestExpired(); + System.out.println("started"); + isExecuting = true; + try { + if (future != null) { + future.cancel(false); + } + if(!isContinuous){ + notifyTestExcuted(1); + } + + while(isContinuous){ + notifyTestExcuted(100); + isTestExpired(); + } + + } finally { + isExecuting = false; + isExecuted = true; + } + return true; + } + + + /** @throws RuntimeException if the test has bee running too long */ + private void isTestExpired(){ + if(System.currentTimeMillis() > EXPIRY_TIME){ + throw new RuntimeException("Something went wrong the test expired."); + } + } + + + /** + * The thread executing {@link #test()} if blocked from returning until another thread invokes + * {@link #waitForTestExec(long)} or the specified time elapses + * @param timeoutMillis - the amount of time to wait for a execution iteration. + * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)} + */ + private boolean notifyTestExcuted(long timeoutMillis){ + try { + boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS); + if(acquire){ + outer.release(); + System.out.println("release"); + } + } catch (InterruptedException e) { + if(!isIgnoreInterrupt){ + return false; + } + } + return true; + } + } + + + static class Sub { + ExecuteTest et = new ExecuteTest(); + Future future = null; } } diff --git a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/BundleHelperTest.java b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/BundleHelperTest.java index 2edc285f0..878d15f39 100644 --- a/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/BundleHelperTest.java +++ b/appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/BundleHelperTest.java @@ -80,7 +80,7 @@ public class BundleHelperTest { mapFromGetAppcLcmBundles.put("BundleString", mockBundle); PowerMockito.doReturn(mapFromGetAppcLcmBundles).when(bundleHelper, MemberMatcher.method( - BundleHelper.class, "getAppcLcmBundles")).withNoArguments(); + BundleHelper.class, "getAppcLcmBundles")).withNoArguments(); StateHelper mockStateHelper = mock(StateHelper.class); Whitebox.setInternalState(bundleHelper, "stateHelper", mockStateHelper); @@ -90,25 +90,25 @@ public class BundleHelperTest { // test start Mockito.doReturn(true).when(mockStateHelper).isSameState(appcOamStates); - boolean result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper); + boolean result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper,null); Assert.assertTrue("Should be completed", result); - Mockito.verify(mockTaskHelper, times(1)).submitBundleLcOperation(any()); + Mockito.verify(mockTaskHelper, times(1)).submitBaseSubCallable(any()); // test start aborted Mockito.doReturn(false).when(mockStateHelper).isSameState(appcOamStates); - result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper); + result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper,null); Assert.assertFalse("Should be abort", result); - Mockito.verify(mockTaskHelper, times(1)).submitBundleLcOperation(any()); + Mockito.verify(mockTaskHelper, times(1)).submitBaseSubCallable(any()); // test stop - result = bundleHelper.bundleOperations(AppcOam.RPC.stop, new HashMap<>(), mockTaskHelper); + result = bundleHelper.bundleOperations(AppcOam.RPC.stop, new HashMap<>(), mockTaskHelper,null); Assert.assertTrue("Should be completed", result); - Mockito.verify(mockTaskHelper, times(2)).submitBundleLcOperation(any()); + Mockito.verify(mockTaskHelper, times(2)).submitBaseSubCallable(any()); } @Test(expected = APPCException.class) public void testBundleOperationsRpcException() throws Exception { - bundleHelper.bundleOperations(AppcOam.RPC.maintenance_mode, new HashMap<>(), mockTaskHelper); + bundleHelper.bundleOperations(AppcOam.RPC.maintenance_mode, new HashMap<>(), mockTaskHelper,null); } @Test @@ -156,7 +156,7 @@ public class BundleHelperTest { Mockito.doReturn(null).when(fakeConf).getProperty(propKey); String[] propResult = bundleHelper.readPropsFromPropListName(propKey); Assert.assertArrayEquals("PropertyResult should be empty string array", - ArrayUtils.EMPTY_STRING_ARRAY, propResult); + ArrayUtils.EMPTY_STRING_ARRAY, propResult); // Property has one entry String propValue1 = "1234"; String propValue2 = "5678";