serializing OAM async task 45/16145/2
authorJoey Sullivan <joey.sullivan@amdocs.com>
Wed, 27 Sep 2017 23:02:31 +0000 (23:02 +0000)
committerPatrick Brady <pb071s@att.com>
Wed, 27 Sep 2017 23:54:32 +0000 (23:54 +0000)
Change-Id: I0c98636c165a2cc5b9915a3950ab64744e6328c7
Issue-Id: APPC-244
Signed-off-by: Joey Sullivan <joey.sullivan@amdocs.com>
15 files changed:
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseActionRunnable.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseCommon.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/BaseProcessor.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamMmodeProcessor.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamRestartProcessor.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStartProcessor.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/processor/OamStopProcessor.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/AsyncTaskHelper.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/BundleHelper.java
appc-oam/appc-oam-bundle/src/main/java/org/openecomp/appc/oam/util/ConfigurationHelper.java
appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseActionRunnableTest.java
appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseCommonTest.java
appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/processor/BaseProcessorTest.java
appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/AsyncTaskHelperTest.java
appc-oam/appc-oam-bundle/src/test/java/org/openecomp/appc/oam/util/BundleHelperTest.java

index 542e53d..b118eb0 100644 (file)
@@ -45,9 +45,6 @@ import java.util.concurrent.Future;
  *   <br>  - finalState
  */
 public abstract class BaseActionRunnable extends BaseCommon implements Runnable {
-    final String OAM_OPERATION_TIMEOUT_SECOND = "appc.OAM.api.timeout";
-    /** Default operation tiemout set to 1 minute */
-    final int DEFAULT_OAM_OPERATION_TIMEOUT = 60;
     /** Abort due to rejection message format with flexible operation name */
     final String ABORT_MESSAGE_FORMAT = "Aborting %s operation due to %s.";
     /** Timeout message format with flexible operation name */
@@ -58,7 +55,6 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable
     final String DUE_TO_EXECUTION_ERROR = "due to execution error.";
 
     private boolean isWaiting = false;
-    private AppcOamStates currentState;
     long startTimeMs = 0;
     long timeoutMs = 0;
     boolean doTimeoutChecking = false;
@@ -80,32 +76,25 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable
 
         rpc = parent.rpc;
         commonHeader = parent.commonHeader;
-        startTime = parent.startTime;
         myParent = parent;
-
         setTimeoutValues();
     }
 
     /**
-     * Set timeout in milliseconds
+     * Collect the timeout value for this {@link BaseActionRunnable}
      */
     void setTimeoutValues() {
-        Integer timeoutSeconds = myParent.timeoutSeconds;
-        if (timeoutSeconds == null) {
-            timeoutMs = configurationHelper.getConfig().getIntegerProperty(
-                    OAM_OPERATION_TIMEOUT_SECOND, DEFAULT_OAM_OPERATION_TIMEOUT) * 1000;
-        } else {
-            timeoutMs = timeoutSeconds.longValue() * 1000;
-        }
-
+        startTime = myParent.startTime;
+        timeoutMs = myParent.getTimeoutMilliseconds();
         doTimeoutChecking = timeoutMs != 0;
         if (doTimeoutChecking) {
             startTimeMs = startTime.getTime();
         }
         logDebug("%s action runnable check timeout (%s) with timeout (%d)ms, and startMs (%d)",
-                rpc.name(), Boolean.toString(doTimeoutChecking), timeoutMs, startTimeMs);
+            rpc.name(), Boolean.toString(doTimeoutChecking), timeoutMs, startTimeMs);
     }
 
+
     /**
      * Abort operation handling due to outside interruption, does<br>
      *     - set ABORT status<br>
@@ -114,7 +103,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable
      *
      * @param newRpc of the new AppcOam.RPC operation.
      */
-    public void abortRunnable(final AppcOam.RPC newRpc) {
+    void abortRunnable(final AppcOam.RPC newRpc) {
         resetLogProperties(false);
 
         String additionalMsg = String.format(NEW_RPC_OPERATION_REQUEST, newRpc);
@@ -131,7 +120,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable
         try {
             setInitialLogProperties();
             logDebug(String.format("===========in %s run (waiting: %s)=======",
-                    actionName, Boolean.toString(isWaiting)));
+                actionName, Boolean.toString(isWaiting)));
 
             if (isWaiting) {
                 if (!checkState()) {
@@ -159,7 +148,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable
      */
     void keepWaiting() {
         logDebug(String.format("%s runnable waiting, current state is %s.",
-                actionName, currentState == null ? "null" : currentState.toString()));
+            actionName, stateHelper.getCurrentOamState()));
 
         isTimeout("keepWaiting");
     }
@@ -173,9 +162,9 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable
     boolean isTimeout(String parentName) {
         logDebug(String.format("%s task isTimeout called from %s", actionName, parentName));
         if (doTimeoutChecking
-                && System.currentTimeMillis() - startTimeMs > timeoutMs) {
+            && System.currentTimeMillis() - startTimeMs > timeoutMs) {
             logger.error(String.format("%s operation timeout (%d) ms has reached, abort with error state.",
-                    actionName, timeoutMs));
+                actionName, timeoutMs));
 
             setStatus(OAMCommandStatus.TIMEOUT, String.format(TIMEOUT_MESSAGE_FORMAT, rpc.name(), timeoutMs));
             postAction(AppcOamStates.Error);
@@ -253,8 +242,7 @@ public abstract class BaseActionRunnable extends BaseCommon implements Runnable
             return true;
         }
 
-        currentState = stateHelper.getBundlesState();
-        if (currentState == finalState) {
+        if (stateHelper.getBundlesState() == finalState) {
             setStatus(OAMCommandStatus.SUCCESS);
             postDoAction(true);
             return true;
index ccb5730..9c28007 100644 (file)
@@ -53,7 +53,6 @@ import static com.att.eelf.configuration.Configuration.MDC_INSTANCE_UUID;
 import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
 import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN;
 import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS;
-import static com.att.eelf.configuration.Configuration.MDC_SERVICE_INSTANCE_ID;
 import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;
 
 /**
@@ -61,7 +60,7 @@ import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;
  *     - BaseProcessor (for REST sync handling) <br>
  *     - BaseActionRunnable (for REST async handling)
  */
-abstract class BaseCommon {
+public abstract class BaseCommon {
     final EELFLogger logger;
     final ConfigurationHelper configurationHelper;
     final StateHelper stateHelper;
@@ -74,15 +73,14 @@ abstract class BaseCommon {
     CommonHeader commonHeader;
 
     private final List<String> MDC_KEYS = Arrays.asList(
-       LoggingConstants.MDCKeys.PARTNER_NAME,
-       LoggingConstants.MDCKeys.SERVER_NAME,
-       MDC_INSTANCE_UUID,
-       MDC_KEY_REQUEST_ID,
-       MDC_SERVER_FQDN,
-       MDC_SERVER_IP_ADDRESS,
-       MDC_SERVICE_NAME
+        LoggingConstants.MDCKeys.PARTNER_NAME,
+        LoggingConstants.MDCKeys.SERVER_NAME,
+        MDC_INSTANCE_UUID,
+        MDC_KEY_REQUEST_ID,
+        MDC_SERVER_FQDN,
+        MDC_SERVER_IP_ADDRESS,
+        MDC_SERVICE_NAME
     );
-
     private Map<String, String> oldMdcContent = new HashMap<>();
 
     /**
@@ -110,19 +108,19 @@ abstract class BaseCommon {
     void auditInfoLog(Msg msg) {
         LoggingUtils.auditInfo(startTime.toInstant(),
                 new Date(System.currentTimeMillis()).toInstant(),
-                String.valueOf(status.getCode()),
-                status.getMessage(),
-                getClass().getCanonicalName(),
-                msg,
-                configurationHelper.getAppcName(),
-                stateHelper.getCurrentOamState().toString()
+            String.valueOf(status.getCode()),
+            status.getMessage(),
+            getClass().getCanonicalName(),
+            msg,
+            configurationHelper.getAppcName(),
+            stateHelper.getCurrentOamState().toString()
         );
     }
 
     /**
      * Set MDC properties.
      */
-    void setInitialLogProperties() {
+    public final void setInitialLogProperties() {
         MDC.put(MDC_KEY_REQUEST_ID, commonHeader.getRequestId());
         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, commonHeader.getOriginatorId());
         MDC.put(MDC_INSTANCE_UUID, ""); // value should be created in the future
@@ -141,15 +139,14 @@ abstract class BaseCommon {
     /**
      * Clear MDC properties.
      */
-    void clearRequestLogProperties() {
-        try {
-            MDC.remove(MDC_KEY_REQUEST_ID);
-            MDC.remove(MDC_SERVICE_INSTANCE_ID);
-            MDC.remove(MDC_SERVICE_NAME);
-            MDC.remove(LoggingConstants.MDCKeys.PARTNER_NAME);
-            MDC.remove(LoggingConstants.MDCKeys.TARGET_VIRTUAL_ENTITY);
-        } catch (Exception e) {
-            logger.error("Unable to clear the Request Log properties" + e.getMessage());
+    public final void clearRequestLogProperties() {
+        for (String key : MDC_KEYS) {
+            try {
+                MDC.remove(key);
+            } catch (Exception e) {
+                logger.error(
+                    String.format("Unable to clear the Log properties (%s) due to exception: %s", key, e.getMessage()));
+            }
         }
     }
 
@@ -227,24 +224,24 @@ abstract class BaseCommon {
             errorMessage = EELFResourceManager.format(Msg.OAM_OPERATION_INVALID_INPUT, t.getMessage());
         } else if (t instanceof InvalidStateException) {
             exceptionMessage = String.format(AppcOam.INVALID_STATE_MESSAGE_FORMAT,
-                    rpc.getAppcOperation(), appName, stateHelper.getCurrentOamState());
+                rpc.getAppcOperation(), appName, stateHelper.getCurrentOamState());
             oamCommandStatus = OAMCommandStatus.REJECTED;
             errorMessage = EELFResourceManager.format(Msg.INVALID_STATE_TRANSITION, exceptionMessage);
         } else {
             oamCommandStatus = OAMCommandStatus.UNEXPECTED_ERROR;
             errorMessage = EELFResourceManager.format(Msg.OAM_OPERATION_EXCEPTION, t,
-                    appName, t.getClass().getSimpleName(), rpc.name(), exceptionMessage);
+                appName, t.getClass().getSimpleName(), rpc.name(), exceptionMessage);
         }
 
         setStatus(oamCommandStatus, exceptionMessage);
 
         LoggingUtils.logErrorMessage(
-                String.valueOf(status.getCode()),
-                status.getMessage(),
-                LoggingConstants.TargetNames.APPC,
-                LoggingConstants.TargetNames.APPC_OAM_PROVIDER,
-                errorMessage,
-                AppcOam.class.getCanonicalName());
+            String.valueOf(status.getCode()),
+            status.getMessage(),
+            LoggingConstants.TargetNames.APPC,
+            LoggingConstants.TargetNames.APPC_OAM_PROVIDER,
+            errorMessage,
+            AppcOam.class.getCanonicalName());
 
         resetLogProperties(true);
     }
index 784becc..aa5423d 100644 (file)
@@ -40,6 +40,8 @@ import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
 
 import java.util.Date;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Base processor for OAM APIs, such as maintenance mode, restart, start and stop API.
@@ -48,11 +50,14 @@ import java.util.concurrent.Future;
  * <p>Specific API processor will overwrite the general methods to add specific behaviors.
  */
 public abstract class BaseProcessor extends BaseCommon {
+    /** lock to serialize incoming OAM operations.  */
+    private static final Object LOCK = new Object();
+
     final AsyncTaskHelper asyncTaskHelper;
     final BundleHelper bundleHelper;
 
-
-    Integer timeoutSeconds;
+    /** the requestTimeoutSeconds to use for this OAM operation */
+    private Integer requestTimeoutSeconds;
     Msg auditMsg;
     BaseActionRunnable runnable;
     private Future<?> scheduledRunnable = null;
@@ -90,7 +95,8 @@ public abstract class BaseProcessor extends BaseCommon {
 
         try {
             preProcess(requestInput);
-            timeoutSeconds = operationHelper.getParamRequestTimeout(requestInput);
+            //The OAM request may specify timeout value
+            requestTimeoutSeconds = operationHelper.getParamRequestTimeout(requestInput);
             scheduleAsyncTask();
         } catch (Exception e) {
             setErrorStatus(e);
@@ -112,13 +118,31 @@ public abstract class BaseProcessor extends BaseCommon {
      * @throws APPCException         when state validation failed
      */
     protected void preProcess(final Object requestInput)
-            throws InvalidInputException, APPCException, InvalidStateException {
+        throws InvalidInputException, APPCException, InvalidStateException,InterruptedException,TimeoutException {
+        setInitialLogProperties();
         operationHelper.isInputValid(requestInput);
 
-        AppcOamStates nextState = operationHelper.getNextState(
-                rpc.getAppcOperation(), stateHelper.getCurrentOamState());
-        setInitialLogProperties();
-        stateHelper.setState(nextState);
+        //All OAM operation pass through here first to validate if an OAM state change is allowed.
+        //If a state change is allowed cancel the occurring OAM (if any) before starting this one.
+        //we will synchronized so that only one can do this at any given time.
+        synchronized(LOCK) {
+            AppcOamStates currentOamState = stateHelper.getCurrentOamState();
+
+            //make sure this OAM operation can transition to the desired OAM operation
+            AppcOamStates nextState = operationHelper.getNextState(
+                    rpc.getAppcOperation(), currentOamState);
+
+            stateHelper.setState(nextState);
+
+            //cancel the  BaseActionRunnable currently executing
+            //it got to be completely terminated before proceeding
+            asyncTaskHelper.cancelBaseActionRunnable(
+                    rpc,
+                    currentOamState,
+                    getTimeoutMilliseconds(),
+                    TimeUnit.MILLISECONDS
+            );
+        }
     }
 
     /**
@@ -135,32 +159,49 @@ public abstract class BaseProcessor extends BaseCommon {
     protected void scheduleAsyncTask() {
         if (runnable == null) {
             logger.error(String.format(
-                    "Skipped schedule async task for rpc(%s) due to runnable is null", rpc.name()));
+                "Skipped schedule async task for rpc(%s) due to runnable is null", rpc.name()));
             return;
         }
 
-        scheduledRunnable = asyncTaskHelper.scheduleAsyncTask(rpc, runnable);
+        scheduledRunnable = asyncTaskHelper.scheduleBaseRunnable(
+            runnable, runnable::abortRunnable, getInitialDelayMillis(), getDelayMillis());
     }
 
+
     /**
-     * Check if current running task is the same as schedule task
-     * @return true if they are the same, otherwise false.
+     * The timeout for this OAM operation. The timeout source is chosen in the following order:
+     * request, config file, default value
+     * @return  - the timeout for this OAM operation.
      */
-    boolean isSameAsyncTask() {
-        return asyncTaskHelper.getCurrentAsyncTask() == scheduledRunnable;
+    long getTimeoutMilliseconds() {
+        return configurationHelper.getOAMOperationTimeoutValue(this.requestTimeoutSeconds);
     }
 
+
     /**
-     * Cancel schedueled async task through AsyncTaskHelper
+     * @return initialDelayMillis - the time to delay first execution of {@link BaseActionRunnable}
+     */
+    protected long getInitialDelayMillis(){
+        return 0L;
+    }
+
+    /**
+     * @return delayMillis the delay between the consecutive executions of  {@link BaseActionRunnable}
+     */
+    private long getDelayMillis(){
+        return 1000L;
+    }
+
+    /**
+     * Cancel the scheduled {@link BaseActionRunnable}  through AsyncTaskHelper
      */
     void cancelAsyncTask() {
         if (scheduledRunnable == null) {
             logger.error(String.format(
-                    "Skipped cancel schedule async task for rpc(%s) due to scheduledRunnable is null", rpc.name()));
+                "Skipped cancel schedule async task for rpc(%s) due to scheduledRunnable is null", rpc.name()));
             return;
         }
-
-        asyncTaskHelper.cancelAsyncTask(scheduledRunnable);
-        scheduledRunnable = null;
+        scheduledRunnable.cancel(true);
     }
+
 }
index 973d0af..6a04660 100644 (file)
@@ -39,6 +39,8 @@ import org.openecomp.appc.requesthandler.LCMStateManager;
 import org.openecomp.appc.requesthandler.RequestHandler;
 import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * Processor to handle maintenance mode OAM API.
  */
@@ -65,7 +67,7 @@ public class OamMmodeProcessor extends BaseProcessor {
 
     @Override
     protected void preProcess(final Object requestInput)
-            throws InvalidInputException, InvalidStateException, APPCException {
+            throws InvalidInputException, InvalidStateException, APPCException, InterruptedException, TimeoutException {
         super.preProcess(requestInput);
 
         //Close the gate so that no more new LCM request will be excepted.
@@ -79,15 +81,26 @@ public class OamMmodeProcessor extends BaseProcessor {
         super.scheduleAsyncTask();
     }
 
+    /**
+     * {@inheritDoc}
+     * For maintenance mode we want a longer delay before initial execution of {@link BaseActionRunnable}
+     * so that any accepted LCM actions have time to git scheduled in the Dispatcher.
+     */
+    @Override
+    protected long getInitialDelayMillis(){
+        //wait ten seconds before
+        return 10000L;
+    }
+
     /**
      * This runnable does the async handling for the maintenance mode REST API, and will be scheduled to run
      * until terminating condition reaches.
      *
-     * <p>The runnable will conintue run if: <br>
+     * <p>The runnable will continue run if: <br>
      *   - the runnable is not canceled outside <br>
      *   - the in progress LCM request count is not zero<br>
      * <p> When LCM request count reaches to zero, this runnable will: <br>
-     *     - post message through operatonHelper <br>
+     *     - post message through operationHelper <br>
      *     - set APP-C OAM state to maintenance mode <br>
      *     - audit log the state <br>
      *     - terminate this runnable itself <br>
@@ -98,7 +111,7 @@ public class OamMmodeProcessor extends BaseProcessor {
         MyRunnable(BaseProcessor parent) {
             super(parent);
 
-            actionName = "OAM Maintanence mode";
+            actionName = "OAM Maintenance mode";
             auditMsg = Msg.OAM_OPERATION_MAINTENANCE_MODE;
             finalState = AppcOamStates.MaintenanceMode;
         }
@@ -113,12 +126,6 @@ public class OamMmodeProcessor extends BaseProcessor {
         boolean checkState() {
             logDebug(String.format("Executing %s task", actionName));
 
-            if (!myParent.isSameAsyncTask()) {
-                // cancel myself if I am not the current backgroundOamTask
-                myParent.cancelAsyncTask();
-                logDebug(String.format("Finished %s task due to task removed", actionName));
-                return true;
-            }
 
             boolean hasError = false;
             try {
@@ -156,8 +163,8 @@ public class OamMmodeProcessor extends BaseProcessor {
         @Override
         void keepWaiting() {
             logDebug("The application '%s' has '%s' outstanding LCM request to complete" +
-                            " before coming to a complete maintenance_mode.",
-                    configurationHelper.getAppcName(), inprogressRequestCount);
+                    " before coming to a complete maintenance_mode.",
+                configurationHelper.getAppcName(), inprogressRequestCount);
         }
     }
 }
index e9f0ada..8575789 100644 (file)
@@ -112,14 +112,14 @@ public class OamRestartProcessor extends BaseProcessor {
         @Override
         boolean doAction() {
             logDebug(String.format("Executing %s task at phase (%s)",
-                    actionName, currentPhase == null ? "null" : currentPhase.name()));
+                actionName, currentPhase == null ? "null" : currentPhase.name()));
 
             boolean isBundleOperationCompleted = true;
             try {
                 switch (currentPhase) {
                     case ToStop:
                         isBundleOperationCompleted = bundleHelper.bundleOperations(
-                                AppcOam.RPC.stop, bundleNameToFuture, myParent.asyncTaskHelper);
+                            AppcOam.RPC.stop, bundleNameToFuture, myParent.asyncTaskHelper, this);
                         currentPhase = ActionPhases.Stopped;
                         break;
                     case Stopped:
@@ -129,12 +129,12 @@ public class OamRestartProcessor extends BaseProcessor {
                             currentPhase = ActionPhases.ToStart;
                         } else {
                             logDebug(String.format("%s task is waiting in stopped phase, current state is %s",
-                                    actionName, currentState));
+                                actionName, currentState));
                         }
                         break;
                     case ToStart:
                         isBundleOperationCompleted = bundleHelper.bundleOperations(
-                                AppcOam.RPC.start, bundleNameToFuture, myParent.asyncTaskHelper);
+                            AppcOam.RPC.start, bundleNameToFuture, myParent.asyncTaskHelper, this);
                         currentPhase = ActionPhases.Started;
                         break;
                     case Error:
@@ -143,13 +143,13 @@ public class OamRestartProcessor extends BaseProcessor {
                     default:
                         // Should not reach log it and return false;
                         logger.error("%s task doAction reached %s phase. not supported. return false.",
-                                actionName, currentPhase.name());
+                            actionName, currentPhase.name());
                         stateHelper.setState(AppcOamStates.Error);
                         return false;
                 }
 
                 if (isTimeout("restart doAction")
-                        || hasBundleOperationFailure()) {
+                    || hasBundleOperationFailure()) {
                     currentPhase = ActionPhases.Error;
                     return true;
                 }
index 0060bfc..0d2f505 100644 (file)
@@ -101,7 +101,7 @@ public class OamStartProcessor extends BaseProcessor {
                 if (stateHelper.getState() != AppcOamStates.Started) {
                     logDebug("Start - APPC OAM state is not started, start the bundles");
                     isBundleOperationCompleted = bundleHelper.bundleOperations(
-                            rpc, bundleNameToFuture, myParent.asyncTaskHelper);
+                        rpc, bundleNameToFuture, myParent.asyncTaskHelper, this);
                 }
 
                 if (isBundleOperationCompleted) {
index d819016..d8d88d3 100644 (file)
@@ -94,7 +94,7 @@ public class OamStopProcessor extends BaseProcessor {
 
             try {
                 boolean isBundleOperationCompleted = bundleHelper.bundleOperations(
-                        rpc, bundleNameToFuture, myParent.asyncTaskHelper);
+                    rpc, bundleNameToFuture, myParent.asyncTaskHelper, this);
                 if (isBundleOperationCompleted) {
                     return true;
                 }
index db60337..0a4b868 100644 (file)
@@ -27,34 +27,61 @@ package org.openecomp.appc.oam.util;
 import com.att.eelf.configuration.EELFLogger;
 import org.openecomp.appc.oam.AppcOam;
 import org.openecomp.appc.oam.processor.BaseActionRunnable;
+import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.FrameworkUtil;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 
 /**
- * Utility class provides general async task related help.
+ * The AsyncTaskHelper class manages an internal parent child data structure.   The parent is a transient singleton,
+ * meaning only one can exist at any given time.     The parent is scheduled with the
+ * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and is executed at configured interval.   It can be
+ * terminated by using the {@link Future#cancel(boolean)} or the {@link Future#cancel(boolean)} returned from \
+ * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}.
+ * <p>
+ * The children are scheduled using {@link #submitBaseSubCallable(Callable)}} and can only be scheduled if a parent
+ * is scheduled.   Children only execute once, but can be terminated preemptively by the {@link Future#cancel(boolean)}
+ * returned from {@link #submitBaseSubCallable(Callable)} or indirectly by terminating the parent via the method
+ * described above.
+ * <p>
+ * This class augments the meaning of {@link Future#isDone()} in that it guarantees that this method only returns true
+ * if the scheduled {@link Runnable} or {@link Callable}  is not currently executing and is not going to execute in the
+ * future.   This is different than the Java core implementation of {@link Future#isDone()} in which it will return
+ * true immediately after the {@link Future#cancel(boolean)} is called. Even if a Thread is actively executing the
+ * {@link Runnable} or {@link Callable} and has not return yet. See Java BUG JDK-8073704
+ * <p>
+ * The parent {@link Future#isDone()} has an additional augmentation in that it will not return true until all of its
+ * children's {@link Future#isDone()} also return true.
+ *
  */
 @SuppressWarnings("unchecked")
 public class AsyncTaskHelper {
-    final int MMODE_TASK_DELAY = 10000;
-    final int COMMON_INITIAL_DELAY = 0;
-    final int COMMON_INTERVAL = 1000;
 
     private final EELFLogger logger;
     private final ScheduledExecutorService scheduledExecutorService;
     private final ThreadPoolExecutor bundleOperationService;
 
-    /** Reference to the Async task */
-    private volatile Future<?> backgroundOamTask;
-    /** Reference to the runnable of Async task */
-    private volatile BaseActionRunnable taskRunnable;
+    /** Reference to {@link MyFuture} return from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
+    private MyFuture backgroundBaseRunnableFuture;
+
+    /** The cancel Callback from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}   */
+    private Consumer<AppcOam.RPC> cancelCallBackForBaseRunnable;
+
+    /** All Futures created by thus calls which have not completed -- {@link Future#isDone()} equals false  */
+    private Set<MyFuture> myFutureSet = new HashSet<>();
 
     /**
      * Constructor
@@ -64,103 +91,162 @@ public class AsyncTaskHelper {
         logger = eelfLogger;
 
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-                (runnable) -> {
-                    Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
-                    return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
-                }
+            (runnable) -> {
+                Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
+                return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
+            }
         );
 
         bundleOperationService = new ThreadPoolExecutor(
-                0,
-                10,
-                10,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue(),// BlockingQueue<Runnable> workQueue
-                (runnable) -> new Thread(runnable, "OAM bundler operation executor")//ThreadFactory
+            0,
+            10,
+            10,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue(), //BlockingQueue<Runnable> workQueue
+            (runnable) -> {
+                Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
+                return new Thread(runnable, bundle.getSymbolicName() + " bundle operation executor");
+            }
         );
     }
 
-    void addThreadsToPool() {
-        bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
-    }
-
-    void removeThreadsFromPoolWhenDone() {
-        bundleOperationService.setCorePoolSize(0);
-    }
-
     /**
      * Terminate the class <bS>ScheduledExecutorService</b>
      */
     public void close() {
         logDebug("Start shutdown scheduleExcutorService.");
-        scheduledExecutorService.shutdown();
-        bundleOperationService.shutdown();
+        bundleOperationService.shutdownNow();
+        scheduledExecutorService.shutdownNow();
         logDebug("Completed shutdown scheduleExcutorService.");
     }
 
+
     /**
-     * Get current async task refernce
-     * @return the class <b>backgroundOamTask</b>
+     * Cancel currently executing {@link BaseActionRunnable} if any.
+     * This method returns immediately if there is currently no {@link BaseActionRunnable} actively executing.
+     * @param rpcCausingAbort - The RPC causing the abort
+     * @param stateBeingAbborted - The current state being canceled
+     * @param timeout - The amount of time to wait for a cancel to complete
+     * @param timeUnit - The unit of time of timeout
+     * @throws TimeoutException - If {@link BaseActionRunnable} has not completely cancelled within the timeout period
+     * @throws InterruptedException - If the Thread waiting for the abort
      */
-    public Future<?> getCurrentAsyncTask() {
-        return backgroundOamTask;
+    public synchronized void cancelBaseActionRunnable(final AppcOam.RPC rpcCausingAbort,
+                                                      AppcOamStates stateBeingAbborted,
+                                                      long timeout, TimeUnit timeUnit)
+        throws TimeoutException,InterruptedException {
+
+        final MyFuture localBackgroundBaseRunnableFuture = backgroundBaseRunnableFuture;
+        final Consumer<AppcOam.RPC> localCancelCallBackForBaseRunnable = cancelCallBackForBaseRunnable;
+
+        if (localBackgroundBaseRunnableFuture == null || localBackgroundBaseRunnableFuture.isDone()) {
+          return;
+        }
+
+        if (localCancelCallBackForBaseRunnable != null) {
+            localCancelCallBackForBaseRunnable.accept(rpcCausingAbort);
+        }
+        localBackgroundBaseRunnableFuture.cancel(true);
+
+        long timeoutMillis = timeUnit.toMillis(timeout);
+        long expiryTime = System.currentTimeMillis() + timeoutMillis;
+        while (!(localBackgroundBaseRunnableFuture.isDone())) {
+            long sleepTime = expiryTime - System.currentTimeMillis();
+            if (sleepTime < 1) {
+                break;
+            }
+            this.wait(sleepTime);
+        }
+
+        if (!localBackgroundBaseRunnableFuture.isDone()) {
+            throw new TimeoutException(String.format("Unable to abort %s in timely manner.",stateBeingAbborted));
+        }
     }
 
     /**
-     * Schedule a service for async task with the passed in parameters
-     * @param rpc of the REST API call, decides how to schedule the service
+     * Schedule a {@link BaseActionRunnable} to begin async execution.   This is the Parent  {@link Runnable} for the
+     * children that are submitted by {@link #submitBaseSubCallable(Callable)}
+     *
+     * The currently executing {@link BaseActionRunnable} must fully be terminated before the next can be scheduled.
+     * This means all Tasks' {@link MyFuture#isDone()} must equal true and all threads must return to their respective
+     * thread pools.
+     *
      * @param runnable of the to be scheduled service.
-     * @return the reference of the scheduled task
+     * @param cancelCallBack to be invoked when
+     *        {@link #cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} is invoked.
+     * @param initialDelayMillis the time to delay first execution
+     * @param delayMillis the delay between the termination of one
+     * execution and the commencement of the next
+     * @return The {@link BaseActionRunnable}'s {@link Future}
+     * @throws IllegalStateException if there is currently executing Task
      */
-    public Future<?> scheduleAsyncTask(final AppcOam.RPC rpc, final BaseActionRunnable runnable) {
-        int initialDelay, interval;
-        switch (rpc) {
-            case maintenance_mode:
-                initialDelay = interval =MMODE_TASK_DELAY;
-                break;
-            case start:
-            case stop:
-            case restart:
-                initialDelay = COMMON_INITIAL_DELAY;
-                interval = COMMON_INTERVAL;
-                break;
-            default:
-                // should not get here. Log it and return null
-                logDebug(String.format("Cannot scheudle task for unsupported RPC(%s).", rpc.name()));
-                return null;
-        }
-
-        // Always cancel existing  async task
-        if (backgroundOamTask != null) {
-            logDebug("Cancelling background task in schedule task.");
-            backgroundOamTask.cancel(true);
-            if (taskRunnable != null) {
-                taskRunnable.abortRunnable(rpc);
-            }
+    public synchronized Future<?> scheduleBaseRunnable(final Runnable runnable,
+                                                       final Consumer<AppcOam.RPC> cancelCallBack,
+                                                       long initialDelayMillis,
+                                                       long delayMillis)
+        throws IllegalStateException {
+
+        if (backgroundBaseRunnableFuture != null && !backgroundBaseRunnableFuture.isDone()) {
+            throw new IllegalStateException("Unable to schedule background task when one is already running.  All task must fully terminated before another can be scheduled. ");
         }
 
-        taskRunnable = runnable;
-        backgroundOamTask = scheduledExecutorService.scheduleWithFixedDelay(
-                runnable, initialDelay, interval, TimeUnit.MILLISECONDS);
+        this.cancelCallBackForBaseRunnable = cancelCallBack;
 
-        return backgroundOamTask;
-    }
+        backgroundBaseRunnableFuture = new MyFuture(runnable) {
+            /**
+             * augments the cancel operation to cancel all subTack too,
+             */
+            @Override
+            public boolean cancel(final boolean mayInterruptIfRunning) {
+                boolean cancel;
+                synchronized (AsyncTaskHelper.this) {
+                    cancel = super.cancel(mayInterruptIfRunning);
+                    myFutureSet.stream().filter(f->!this.equals(f)).forEach(f->f.cancel(mayInterruptIfRunning));
+                }
+                return cancel;
+            }
 
-    Future<?> submitBundleLcOperation(final Callable callable) {
-        return bundleOperationService.submit(callable);
+            /**
+             * augments the isDone operation to return false until all subTask have completed too.
+             */
+            @Override
+            public boolean isDone() {
+                synchronized (AsyncTaskHelper.this) {
+                    return myFutureSet.isEmpty();
+                }
+            }
+        };
+        backgroundBaseRunnableFuture.setFuture(
+            scheduledExecutorService.scheduleWithFixedDelay(
+                backgroundBaseRunnableFuture, initialDelayMillis, delayMillis, TimeUnit.MILLISECONDS)
+        );
+        return backgroundBaseRunnableFuture;
     }
 
     /**
-     * Cancle a previously schedule task. If the task is the same as backgroundOamTask, set it to null.
-     * @param task to be canceled
+     * Submits children {@link Callable} to be executed as soon as possible,  A parent must have been scheduled
+     * previously via {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}
+     * @param callable the Callable to be submitted
+     * @return The {@link Callable}'s {@link Future}
      */
-    public void cancelAsyncTask(Future<?> task) {
-        task.cancel(false);
-        if (task == backgroundOamTask) {
-            backgroundOamTask = null;
-            taskRunnable = null;
-            logDebug("Cancelling background task in cancel task.");
+    synchronized Future<?> submitBaseSubCallable(final Callable callable) {
+
+        if (backgroundBaseRunnableFuture == null
+            || backgroundBaseRunnableFuture.isCancelled()
+            || backgroundBaseRunnableFuture.isDone()){
+            throw new IllegalStateException("Unable to schedule subCallable when a base Runnable is not running.");
+        }
+
+        //Make sure the pool is ready to go
+        if(bundleOperationService.getPoolSize() != bundleOperationService.getMaximumPoolSize()){
+            bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
+            bundleOperationService.prestartAllCoreThreads();
+            bundleOperationService.setCorePoolSize(0);
         }
+
+        MyFuture<?> myFuture = new MyFuture(callable);
+        myFuture.setFuture(bundleOperationService.submit((Callable)myFuture));
+        return myFuture;
     }
 
     /**
@@ -173,4 +259,132 @@ public class AsyncTaskHelper {
             logger.debug(String.format(message, args));
         }
     }
+
+    /**
+     * This class has two purposes.  First it insures  {@link #isDone()} only returns true if the deligate is not
+     * currently running and will not be running in the future: See Java BUG JDK-8073704 Second this class maintains
+     * the {@link #myFutureSet } by insurring that itself is removed when  {@link #isDone()} returns true.
+     *
+     * See {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and {@link #submitBaseSubCallable(Callable)}
+     * for usage of this class
+     */
+    private class MyFuture<T> implements Future<T>, Runnable, Callable<T> {
+
+        private Future<T> future;
+        private final Runnable runnable;
+        private final Callable<T> callable;
+        private boolean isRunning;
+
+        MyFuture(Runnable runnable) {
+            this.runnable = runnable;
+            this.callable = null;
+            myFutureSet.add(this);
+        }
+
+        MyFuture(Callable<T> callable) {
+            this.runnable = null;
+            this.callable = callable;
+            myFutureSet.add(this);
+        }
+
+        void setFuture(Future<T> future) {
+            this.future = future;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            synchronized (AsyncTaskHelper.this) {
+                if (!isRunning) {
+                    myFutureSetRemove();
+                }
+
+                return future.cancel(mayInterruptIfRunning);
+            }
+        }
+
+        @Override
+        public boolean isCancelled() {
+            synchronized (AsyncTaskHelper.this) {
+                return future.isCancelled();
+            }
+        }
+
+        @Override
+        public boolean isDone() {
+            synchronized (AsyncTaskHelper.this) {
+                return future.isDone() && !isRunning;
+            }
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+                return future.get();
+        }
+
+        @Override
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return future.get(timeout, unit);
+        }
+
+        @Override
+        public void run() {
+            synchronized (AsyncTaskHelper.this) {
+                if(future.isCancelled()){
+                    return;
+                }
+                isRunning = true;
+            }
+            try {
+                runnable.run();
+            } finally {
+                synchronized (AsyncTaskHelper.this) {
+                    isRunning = false;
+
+                    //The Base Runnable is expected to run again.
+                    //unless it has been canceled.
+                    //so only removed if it is canceled.
+                    if (future.isCancelled()) {
+                        myFutureSetRemove();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public T call() throws Exception {
+            synchronized (AsyncTaskHelper.this) {
+                if(future.isCancelled()){
+                    throw new CancellationException();
+                }
+                isRunning = true;
+            }
+            try {
+                return callable.call();
+            } finally {
+                synchronized (AsyncTaskHelper.this){
+                    isRunning = false;
+                    myFutureSetRemove();
+                }
+            }
+        }
+
+
+        /**
+         * Removes this from the the myFutureSet.
+         * When all the BaseActionRunnable is Done notify any thread waiting in
+         * {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}
+         */
+        void myFutureSetRemove(){
+            synchronized (AsyncTaskHelper.this) {
+                myFutureSet.remove(this);
+                if(myFutureSet.isEmpty()){
+                    backgroundBaseRunnableFuture = null;
+                    cancelCallBackForBaseRunnable = null;
+                    AsyncTaskHelper.this.notifyAll();
+
+                }
+            }
+        }
+
+    }
 }
index 7fbb3c4..74159bd 100644 (file)
@@ -28,6 +28,7 @@ import com.att.eelf.configuration.EELFLogger;
 import org.apache.commons.lang3.ArrayUtils;
 import org.openecomp.appc.exceptions.APPCException;
 import org.openecomp.appc.oam.AppcOam;
+import org.openecomp.appc.oam.processor.BaseCommon;
 import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
@@ -73,8 +74,9 @@ public class BundleHelper {
      */
     public boolean bundleOperations(AppcOam.RPC rpc,
                                     Map<String, Future<?>> threads,
-                                    AsyncTaskHelper taskHelper)
-            throws APPCException {
+                                    AsyncTaskHelper taskHelper,
+                                    BaseCommon baseCommon)
+        throws APPCException {
         long mStartTime = System.currentTimeMillis();
         logDebug(String.format("Entering OAM bundleOperations with rpc (%s).", rpc.name()));
 
@@ -88,7 +90,6 @@ public class BundleHelper {
         boolean isBundleOperationComplete = true;
 
         Map<String, Bundle> appcLcmBundles = getAppcLcmBundles();
-        taskHelper.addThreadsToPool();
         for (Map.Entry<String, Bundle> bundleEntry : appcLcmBundles.entrySet()) {
             String bundleName = bundleEntry.getKey();
             Bundle bundle = bundleEntry.getValue();
@@ -99,30 +100,29 @@ public class BundleHelper {
                 // such as when a Stop request is receive while APPC is still trying to Start Up.
                 if (!stateHelper.isSameState(originalState)) {
                     logger.warn("OAM %s bundle operation aborted since OAM state is no longer %s!",
-                            originalState.name());
+                        originalState.name());
                     isBundleOperationComplete = false;
                     break;
                 }
             }
 
             threads.put(bundleName,
-                    taskHelper.submitBundleLcOperation(new BundleTask(rpc, bundle)));
+                taskHelper.submitBaseSubCallable(new BundleTask(rpc, bundle,baseCommon)));
         }
-        taskHelper.removeThreadsFromPoolWhenDone();
 
         logDebug(String.format("Leaving OAM bundleOperations with rpc (%s) with complete(%s), elasped (%d) ms.",
-                rpc.name(), Boolean.toString(isBundleOperationComplete), getElaspeTimeMs(mStartTime)));
+            rpc.name(), Boolean.toString(isBundleOperationComplete), getElapseTimeMs(mStartTime)));
 
         return isBundleOperationComplete;
     }
 
-    private long getElaspeTimeMs(long mStartTime) {
+    private long getElapseTimeMs(long mStartTime) {
         return System.currentTimeMillis() - mStartTime;
     }
 
     /**
      * Check if all BundleTasks are completed
-     * @param bundleNameFutureMap with bundler name and BundleTask Future object
+     * @param bundleNameFutureMap with bundle name and BundleTask Future object
      * @return true if all are done, otherwise, false
      */
     public boolean isAllTaskDone(Map<String, Future<?>> bundleNameFutureMap) {
@@ -131,20 +131,22 @@ public class BundleHelper {
     }
 
     /**
-     * Cancel BunldeTasks which are not finished
-     * @param bundleNameFutureMap with bundler name and BundleTask Future object
+     * Cancel BundleTasks which are not finished
+     * @param bundleNameFutureMap with bundle name and BundleTask Future object
      */
     public void cancelUnfinished(Map<String, Future<?>> bundleNameFutureMap) {
-        bundleNameFutureMap.values().stream().filter((f) -> !f.isDone()).forEach((f) -> f.cancel(true));
+        bundleNameFutureMap.values().stream().filter((f)
+            -> !f.isDone()).forEach((f)
+            -> f.cancel(true));
     }
 
     /**
      * Get number of failed BundleTasks
-     * @param bundleNameFurtureMap with bundler name and BundleTask Future object
+     * @param bundleNameFutureMap with bundle name and BundleTask Future object
      * @return number(long) of the failed BundleTasks
      */
-    public long getFailedMetrics(Map<String, Future<?>> bundleNameFurtureMap) {
-        return bundleNameFurtureMap.values().stream().map((f) -> {
+    public long getFailedMetrics(Map<String, Future<?>> bundleNameFutureMap) {
+        return bundleNameFutureMap.values().stream().map((f) -> {
             try {
                 return f.get();
             } catch (Exception e) {
@@ -168,10 +170,10 @@ public class BundleHelper {
         BundleFilter bundleList = new BundleFilter(bundlesToStop, regExBundleNotStop, getBundleList());
 
         logger.info(String.format("(%d) APPC bundles to Stop/Start: %s.", bundleList.getBundlesToStop().size(),
-                bundleList.getBundlesToStop().toString()));
+            bundleList.getBundlesToStop().toString()));
 
         logger.debug(String.format("(%d) APPC bundles that won't be Stopped/Started: %s.",
-                bundleList.getBundlesToNotStop().size(), bundleList.getBundlesToNotStop().toString()));
+            bundleList.getBundlesToNotStop().size(), bundleList.getBundlesToNotStop().toString()));
 
         return bundleList.getBundlesToStop();
     }
@@ -229,17 +231,21 @@ public class BundleHelper {
         private Bundle bundle;
         private String bundleName;
         private String actionName;
+        private final BaseCommon baseCommon;
 
-        BundleTask(AppcOam.RPC rpcIn, Bundle bundleIn) {
+        BundleTask(AppcOam.RPC rpcIn, Bundle bundleIn, BaseCommon baseCommon) {
             rpc = rpcIn;
             actionName = rpc.getAppcOperation().toString();
             bundle = bundleIn;
             bundleName = bundle.getSymbolicName();
+            this.baseCommon = baseCommon;
         }
 
         @Override
         public BundleTask call() throws Exception {
             try {
+                baseCommon.setInitialLogProperties();
+
                 long bundleOperStartTime = System.currentTimeMillis();
                 logDebug(String.format("OAM %s bundle %s ===>", actionName, bundleName));
                 switch (rpc) {
@@ -253,12 +259,15 @@ public class BundleHelper {
                         // should do nothing
                 }
                 logDebug(String.format("OAM %s bundle %s completed <=== elasped %d",
-                        actionName, bundleName, getElaspeTimeMs(bundleOperStartTime)));
+                    actionName, bundleName, getElapseTimeMs(bundleOperStartTime)));
             } catch (BundleException e) {
                 logger.error(String.format("Exception encountered when OAM %s bundle %s ",
-                        actionName, bundleName), e);
+                    actionName, bundleName), e);
                 failException = e;
             }
+            finally {
+                baseCommon.clearRequestLogProperties();
+            }
             return this;
         }
     }
index c465b9b..6e6ab03 100644 (file)
@@ -30,12 +30,17 @@ import org.openecomp.appc.Constants;
 import org.openecomp.appc.configuration.Configuration;
 import org.openecomp.appc.configuration.ConfigurationFactory;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Utility class provides general configuration helps
  */
 public class ConfigurationHelper {
     final static String PROP_KEY_APPC_NAME = Constants.PROPERTY_APPLICATION_NAME;
     final static String PROP_KEY_METRIC_STATE = "metric.enabled";
+    private final String OAM_OPERATION_TIMEOUT_SECOND = "appc.OAM.api.timeout";
+    /** Default operation timeout set to 1 minute */
+    private final int DEFAULT_OAM_OPERATION_TIMEOUT = 60;
 
     private final EELFLogger logger;
     private Configuration configuration = ConfigurationFactory.getConfiguration();
@@ -57,7 +62,7 @@ public class ConfigurationHelper {
     }
 
     /**
-     * Read property value of a specified proeprty key
+     * Read property value of a specified property key
      *
      * @param propertyKey string of the property key
      * @return String[] of the property values associated with the propertyKey
@@ -77,4 +82,23 @@ public class ConfigurationHelper {
         }
         return new String[]{propertyValue};
     }
+
+
+
+
+
+    /**
+     * This method returns timeout in milliseconds.  The source is chosen in the following order:
+     * The overrideTimeoutSeconds argument
+     * or {@link #OAM_OPERATION_TIMEOUT_SECOND} found in the configuration file
+     * or the {@link #DEFAULT_OAM_OPERATION_TIMEOUT}
+     * @param overrideTimeoutSeconds  or null to us the other sources
+     * @return timeout in milliseconds
+     */
+    public long getOAMOperationTimeoutValue(Integer overrideTimeoutSeconds) {
+        return overrideTimeoutSeconds == null ?
+            getConfig().getIntegerProperty(OAM_OPERATION_TIMEOUT_SECOND, DEFAULT_OAM_OPERATION_TIMEOUT) * 1000
+            :
+            TimeUnit.MILLISECONDS.toMillis(overrideTimeoutSeconds);
+    }
 }
index c5ad95e..07500f4 100644 (file)
@@ -22,7 +22,6 @@
  * ============LICENSE_END=========================================================
  */
 
-
 package org.openecomp.appc.oam.processor;
 
 import com.att.eelf.configuration.EELFLogger;
@@ -31,7 +30,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.yang.gen.v1.org.openecomp.appc.oam.rev170303.common.header.CommonHeader;
-import org.openecomp.appc.configuration.Configuration;
 import org.openecomp.appc.i18n.Msg;
 import org.openecomp.appc.oam.AppcOam;
 import org.openecomp.appc.oam.OAMCommandStatus;
@@ -46,7 +44,6 @@ import org.powermock.reflect.Whitebox;
 import java.util.Date;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyMap;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -102,7 +99,6 @@ public class BaseActionRunnableTest {
     private StateHelper mockStateHelper = mock(StateHelper.class);
     private OperationHelper mockOperHelper = mock(OperationHelper.class);
     private ConfigurationHelper mockConfigHelper = mock(ConfigurationHelper.class);
-    private Configuration mockConfig = mock(Configuration.class);
     private BundleHelper mockBundleHelper = mock(BundleHelper.class);
 
     @SuppressWarnings("ResultOfMethodCallIgnored")
@@ -111,11 +107,8 @@ public class BaseActionRunnableTest {
         // to avoid operation on logger fail, mock up the logger
         EELFLogger mockLogger = mock(EELFLogger.class);
 
-        Mockito.doReturn(mockConfig).when(mockConfigHelper).getConfig();
-        Mockito.doReturn(10).when(mockConfig).getIntegerProperty(any(), anyInt());
-
         testProcessor = spy(
-                new TestProcessor(mockLogger, mockConfigHelper, mockStateHelper, null, mockOperHelper));
+            new TestProcessor(mockLogger, mockConfigHelper, mockStateHelper, null, mockOperHelper));
         Whitebox.setInternalState(testProcessor, "bundleHelper", mockBundleHelper);
 
         testBaseAcionRunnable = spy(new TestAbc(testProcessor));
@@ -127,41 +120,41 @@ public class BaseActionRunnableTest {
         Whitebox.setInternalState(testBaseAcionRunnable, "timeoutMs", 0);
         Whitebox.setInternalState(testBaseAcionRunnable, "startTimeMs", 0);
         Whitebox.setInternalState(testBaseAcionRunnable, "doTimeoutChecking", false);
+        long expectedTimeout = 10000L;
+        Mockito.doReturn(expectedTimeout).when(mockConfigHelper).getOAMOperationTimeoutValue(any());
         testBaseAcionRunnable.setTimeoutValues();
-        Assert.assertEquals("Should set timeoutMs", 10 * 1000, testBaseAcionRunnable.timeoutMs);
+        Assert.assertEquals("Should set timeoutMs", expectedTimeout, testBaseAcionRunnable.timeoutMs);
         Assert.assertTrue("Should set start time MS", testBaseAcionRunnable.startTimeMs != 0);
         Assert.assertTrue("Should do check", testBaseAcionRunnable.doTimeoutChecking);
 
         Whitebox.setInternalState(testBaseAcionRunnable, "timeoutMs", 0);
         Whitebox.setInternalState(testBaseAcionRunnable, "startTimeMs", 0);
         Whitebox.setInternalState(testBaseAcionRunnable, "doTimeoutChecking", false);
-        int timeoutSeconds = 20;
-        Whitebox.setInternalState(testProcessor, "timeoutSeconds", timeoutSeconds);
+        expectedTimeout = 20000L;
+        Mockito.doReturn(expectedTimeout).when(mockConfigHelper).getOAMOperationTimeoutValue(any());
         testBaseAcionRunnable.setTimeoutValues();
-        Assert.assertEquals("Should set timeoutMs", timeoutSeconds * 1000, testBaseAcionRunnable.timeoutMs);
+        Assert.assertEquals("Should set timeoutMs", expectedTimeout, testBaseAcionRunnable.timeoutMs);
         Assert.assertTrue("Should set start time MS", testBaseAcionRunnable.startTimeMs != 0);
         Assert.assertTrue("Should do check", testBaseAcionRunnable.doTimeoutChecking);
 
         Whitebox.setInternalState(testBaseAcionRunnable, "timeoutMs", 0);
         Whitebox.setInternalState(testBaseAcionRunnable, "startTimeMs", 0);
         Whitebox.setInternalState(testBaseAcionRunnable, "doTimeoutChecking", false);
-
-        timeoutSeconds = 0;
-        Whitebox.setInternalState(testProcessor, "timeoutSeconds", timeoutSeconds);
-        Mockito.doReturn(0).when(mockConfig).getIntegerProperty(
-                testBaseAcionRunnable.OAM_OPERATION_TIMEOUT_SECOND, testBaseAcionRunnable.DEFAULT_OAM_OPERATION_TIMEOUT);
+        expectedTimeout = 0L;
+        Mockito.doReturn(expectedTimeout).when(mockConfigHelper).getOAMOperationTimeoutValue(any());
         testBaseAcionRunnable.setTimeoutValues();
-        Assert.assertEquals("Should set timeoutMs", timeoutSeconds * 1000, testBaseAcionRunnable.timeoutMs);
+        Assert.assertEquals("Should set timeoutMs", expectedTimeout, testBaseAcionRunnable.timeoutMs);
         Assert.assertTrue("Should not set start time MS", testBaseAcionRunnable.startTimeMs == 0);
         Assert.assertFalse("Should not do check", testBaseAcionRunnable.doTimeoutChecking);
     }
+
     @Test
     public void testRun() throws Exception {
         // test doAction failed
         Whitebox.setInternalState(testBaseAcionRunnable, "doActionResult", false);
         testBaseAcionRunnable.run();
         Assert.assertFalse("isWaiting should still be false",
-                Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting"));
+            Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting"));
 
         // test doAction success
         Whitebox.setInternalState(testBaseAcionRunnable, "doActionResult", true);
@@ -170,13 +163,13 @@ public class BaseActionRunnableTest {
         Mockito.doReturn(true).when(testBaseAcionRunnable).checkState();
         testBaseAcionRunnable.run();
         Assert.assertFalse("isWaiting should still be false",
-                Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting"));
+            Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting"));
 
         // with checkState return false
         Mockito.doReturn(false).when(testBaseAcionRunnable).checkState();
         testBaseAcionRunnable.run();
         Assert.assertTrue("isWaiting should still be true",
-                Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting"));
+            Whitebox.getInternalState(testBaseAcionRunnable, "isWaiting"));
 
         // should stay
         testBaseAcionRunnable.run();
@@ -187,11 +180,11 @@ public class BaseActionRunnableTest {
     public void testSetAbortStatus() throws Exception {
         testBaseAcionRunnable.setAbortStatus();
         Assert.assertEquals("Should return abort code", OAMCommandStatus.ABORT.getResponseCode(),
-                testBaseAcionRunnable.status.getCode().intValue());
+            testBaseAcionRunnable.status.getCode().intValue());
         Assert.assertTrue("Should set abort due to execution error message",
-                testBaseAcionRunnable.status.getMessage().endsWith(
-                        String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT,
-                                testRpc.name(), testBaseAcionRunnable.DUE_TO_EXECUTION_ERROR)));
+            testBaseAcionRunnable.status.getMessage().endsWith(
+                String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT,
+                    testRpc.name(), testBaseAcionRunnable.DUE_TO_EXECUTION_ERROR)));
     }
 
     @Test
@@ -261,10 +254,10 @@ public class BaseActionRunnableTest {
         Assert.assertTrue("Should be timeout", testBaseAcionRunnable.isTimeout(parentName));
         Mockito.verify(testBaseAcionRunnable, times(1)).postAction(any());
         Assert.assertEquals("Should return timeout code", OAMCommandStatus.TIMEOUT.getResponseCode(),
-                testBaseAcionRunnable.status.getCode().intValue());
+            testBaseAcionRunnable.status.getCode().intValue());
         Assert.assertTrue("Should set timeout message",
-                testBaseAcionRunnable.status.getMessage().endsWith(
-                        String.format(testBaseAcionRunnable.TIMEOUT_MESSAGE_FORMAT, testRpc.name(), timeoutMs)));
+            testBaseAcionRunnable.status.getMessage().endsWith(
+                String.format(testBaseAcionRunnable.TIMEOUT_MESSAGE_FORMAT, testRpc.name(), timeoutMs)));
     }
 
     @SuppressWarnings("unchecked")
@@ -277,9 +270,8 @@ public class BaseActionRunnableTest {
         long failedNumber = 1;
         Mockito.doReturn(failedNumber).when(mockBundleHelper).getFailedMetrics(anyMap());
         Assert.assertTrue("should return true", testBaseAcionRunnable.hasBundleOperationFailure());
-        Mockito.verify(testBaseAcionRunnable,
-                times(1)).setStatus(OAMCommandStatus.UNEXPECTED_ERROR,
-                String.format(testBaseAcionRunnable.BUNDLE_OPERATION_FAILED_FORMAT, failedNumber));
+        Mockito.verify(testBaseAcionRunnable, times(1)).setStatus(OAMCommandStatus.UNEXPECTED_ERROR,
+            String.format(testBaseAcionRunnable.BUNDLE_OPERATION_FAILED_FORMAT, failedNumber));
         Mockito.verify(testBaseAcionRunnable, times(1)).postAction(AppcOamStates.Error);
     }
 
@@ -289,11 +281,11 @@ public class BaseActionRunnableTest {
         AppcOam.RPC newRpc = AppcOam.RPC.restart;
         testBaseAcionRunnable.abortRunnable(newRpc);
         Assert.assertEquals("Should return abort code", OAMCommandStatus.ABORT.getResponseCode(),
-                testBaseAcionRunnable.status.getCode().intValue());
+            testBaseAcionRunnable.status.getCode().intValue());
         Assert.assertTrue("Should set abort due to new request message",
-                testBaseAcionRunnable.status.getMessage().endsWith(
-                        String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT, testRpc.name(),
-                                String.format(testBaseAcionRunnable.NEW_RPC_OPERATION_REQUEST, newRpc.name()))));
+            testBaseAcionRunnable.status.getMessage().endsWith(
+                String.format(testBaseAcionRunnable.ABORT_MESSAGE_FORMAT, testRpc.name(),
+                    String.format(testBaseAcionRunnable.NEW_RPC_OPERATION_REQUEST, newRpc.name()))));
         Mockito.verify(mockOperHelper, times(1)).sendNotificationMessage(any(), any(), any());
         Mockito.verify(testBaseAcionRunnable, times(1)).resetLogProperties(false);
         Mockito.verify(testBaseAcionRunnable, times(1)).resetLogProperties(true);
index 3a9e76f..3f51669 100644 (file)
@@ -163,11 +163,20 @@ public class BaseCommonTest {
         testBaseCommon.setInitialLogProperties();
 
         testBaseCommon.resetLogProperties(false);
-        Mockito.verify(testBaseCommon, times(2)).setInitialLogProperties();
+        Mockito.verify(mockCommonHeader, times(2)).getRequestId();
+        Mockito.verify(mockCommonHeader, times(2)).getOriginatorId();
         Map<String, String> oldMdcMap = Whitebox.getInternalState(testBaseCommon, "oldMdcContent");
         Assert.assertTrue("Should have 5 entries in persisted map", oldMdcMap.size() == 5);
 
         testBaseCommon.resetLogProperties(false);
-        Mockito.verify(testBaseCommon, times(3)).setInitialLogProperties();
+        Mockito.verify(mockCommonHeader, times(3)).getRequestId();
+        Mockito.verify(mockCommonHeader, times(3)).getOriginatorId();
+
+        // test oldMdcMap is cleared
+        testBaseCommon.resetLogProperties(false);
+        Mockito.verify(mockCommonHeader, times(4)).getRequestId();
+        Mockito.verify(mockCommonHeader, times(4)).getOriginatorId();
+        oldMdcMap = Whitebox.getInternalState(testBaseCommon, "oldMdcContent");
+        Assert.assertTrue("Should have 5 entries in persisted map", oldMdcMap.size() == 5);
     }
 }
index 354053c..0109adf 100644 (file)
@@ -22,7 +22,6 @@
  * ============LICENSE_END=========================================================
  */
 
-
 package org.openecomp.appc.oam.processor;
 
 import com.att.eelf.configuration.EELFLogger;
@@ -46,8 +45,6 @@ import org.openecomp.appc.oam.util.StateHelper;
 import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
 import org.powermock.reflect.Whitebox;
 
-import java.util.concurrent.Future;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -97,7 +94,7 @@ public class BaseProcessorTest {
         Mockito.doReturn(mockCommonHeader).when(mockInput).getCommonHeader();
 
         testBaseProcessor = spy(
-                new TestAbc(null, mockConfigHelper, mockStateHelper, mockTaskHelper, mockOperHelper));
+            new TestAbc(null, mockConfigHelper, mockStateHelper, mockTaskHelper, mockOperHelper));
 
         Whitebox.setInternalState(testBaseProcessor, "commonHeader", mockCommonHeader);
 
@@ -112,7 +109,7 @@ public class BaseProcessorTest {
         Mockito.doThrow(new InvalidInputException("test")).when(mockOperHelper).isInputValid(mockInput);
         Status status = testBaseProcessor.processRequest(mockInput);
         Assert.assertEquals("Should return reject",
-                OAMCommandStatus.INVALID_PARAMETER.getResponseCode(), status.getCode().intValue());
+            OAMCommandStatus.INVALID_PARAMETER.getResponseCode(), status.getCode().intValue());
     }
 
     @Test
@@ -122,7 +119,7 @@ public class BaseProcessorTest {
         Mockito.doReturn(mockCommonHeader).when(mockInput).getCommonHeader();
         Status status = testBaseProcessor.processRequest(mockInput);
         Assert.assertEquals("Should return success",
-                OAMCommandStatus.ACCEPTED.getResponseCode(), status.getCode().intValue());
+            OAMCommandStatus.ACCEPTED.getResponseCode(), status.getCode().intValue());
     }
 
     @Test(expected = InvalidInputException.class)
@@ -135,7 +132,7 @@ public class BaseProcessorTest {
     public void testPreProcessWithInvalidState() throws Exception {
         Mockito.doReturn(currentState).when(mockStateHelper).getCurrentOamState();
         Mockito.doThrow(new InvalidStateException("test"))
-                .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState);
+            .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState);
         testBaseProcessor.preProcess(mockInput);
     }
 
@@ -143,7 +140,7 @@ public class BaseProcessorTest {
     public void testPreProcessWithAppcException() throws Exception {
         Mockito.doReturn(currentState).when(mockStateHelper).getCurrentOamState();
         Mockito.doThrow(new APPCException("test"))
-                .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState);
+            .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState);
         testBaseProcessor.preProcess(mockInput);
     }
 
@@ -152,7 +149,7 @@ public class BaseProcessorTest {
         Mockito.doReturn(currentState).when(mockStateHelper).getCurrentOamState();
         AppcOamStates nextState = AppcOamStates.Starting;
         Mockito.doReturn(nextState)
-                .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState);
+            .when(mockOperHelper).getNextState(testRpc.getAppcOperation(), currentState);
         testBaseProcessor.preProcess(mockInput);
         Mockito.verify(mockOperHelper, times(1)).isInputValid(mockInput);
         Mockito.verify(mockOperHelper, times(1)).getNextState(testRpc.getAppcOperation(), currentState);
@@ -163,32 +160,14 @@ public class BaseProcessorTest {
     public void testScheduleAsyncTask() throws Exception {
         // test no runnable
         testBaseProcessor.scheduleAsyncTask();
-        Mockito.verify(mockTaskHelper, times(0)).scheduleAsyncTask(any(), any());
+        Assert.assertTrue(Whitebox.getInternalState(testBaseProcessor, "runnable") == null);
+        Assert.assertTrue(Whitebox.getInternalState(testBaseProcessor, "scheduledRunnable") == null);
 
         BaseActionRunnable mockRunnable = mock(BaseActionRunnable.class);
         Whitebox.setInternalState(testBaseProcessor, "runnable", mockRunnable);
         testBaseProcessor.scheduleAsyncTask();
-        Mockito.verify(mockTaskHelper, times(1)).scheduleAsyncTask(testRpc, mockRunnable);
-    }
-
-    @Test
-    public void isSameAsyncTask() throws Exception {
-        Future<?> mockTask1 = mock(Future.class);
-        Whitebox.setInternalState(testBaseProcessor, "scheduledRunnable", mockTask1);
-        Mockito.doReturn(mockTask1).when(mockTaskHelper).getCurrentAsyncTask();
-        Assert.assertTrue("Shoudl be the same", testBaseProcessor.isSameAsyncTask());
-
-        Future<?> mockTask2 = mock(Future.class);
-        Mockito.doReturn(mockTask2).when(mockTaskHelper).getCurrentAsyncTask();
-        Assert.assertFalse("Shoudl not be the same", testBaseProcessor.isSameAsyncTask());
-    }
-
-    @Test
-    public void cancleAsyncTask() throws Exception {
-        Future<?> mockTask = mock(Future.class);
-        Whitebox.setInternalState(testBaseProcessor, "scheduledRunnable", mockTask);
-        testBaseProcessor.cancelAsyncTask();
-        Mockito.verify(mockTaskHelper, times(1)).cancelAsyncTask(mockTask);
+        // scheduledRunnable should still be null, there's no mock done
+        // as I have trouble to make mockTaskHelper.scheduleBaseRunnable to return a proper Future
         Assert.assertTrue(Whitebox.getInternalState(testBaseProcessor, "scheduledRunnable") == null);
     }
 
index 873b217..f81938e 100644 (file)
 package org.openecomp.appc.oam.util;
 
 import com.att.eelf.configuration.EELFLogger;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mockito;
 import org.openecomp.appc.oam.AppcOam;
-import org.openecomp.appc.oam.processor.BaseActionRunnable;
-import org.powermock.reflect.Whitebox;
+import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.FrameworkUtil;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FrameworkUtil.class})
 public class AsyncTaskHelperTest {
     private AsyncTaskHelper asyncTaskHelper;
-    private ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
-    private BaseActionRunnable mockRunnable = mock(BaseActionRunnable.class);
+
+    private long initialDelayMillis = 0;
+    private long delayMillis = 10;
+
 
     @Before
     public void setUp() throws Exception {
-        asyncTaskHelper = new AsyncTaskHelper(null);
 
-        Whitebox.setInternalState(asyncTaskHelper, "scheduledExecutorService", mockScheduler);
         // to avoid operation on logger fail, mock up the logger
         EELFLogger mockLogger = mock(EELFLogger.class);
-        Whitebox.setInternalState(asyncTaskHelper, "logger", mockLogger);
+
+
+        mockStatic(FrameworkUtil.class);
+        Bundle myBundle = mock(Bundle.class);
+        Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName();
+        PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle);
+
+        asyncTaskHelper = new AsyncTaskHelper(mockLogger);
+
+
     }
 
-    @Test
-    public void testClose() throws Exception {
+
+    @After
+    public void shutdown(){
         asyncTaskHelper.close();
-        Mockito.verify(mockScheduler, times(1)).shutdown();
     }
 
+
+    /**
+     * Test that Base Runnable
+     *
+     * Runs at a fix rate;
+     * Only one Base Runnable  can be scheduled at time;
+     * Future.cancle stops the Base Runnable;
+     * That another Base Runnable  can be scheduled once the previous isDone.
+     */
+    @Test
+    public void test_scheduleBaseRunnable_Base_isDone() throws Exception{
+
+
+
+        //loop is to test we can run consecutive Base Runnable
+        for(int testIteration = 0; testIteration < 3;testIteration++){
+            final ExecuteTest et = new ExecuteTest();
+
+            Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
+                    et::test
+                    , s -> { }
+                    ,initialDelayMillis
+                    ,delayMillis
+            );
+
+            //make sure it is running at a fix rate
+            Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
+            Assert.assertFalse("It Should not be Done", future.isDone());
+            Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
+            Assert.assertFalse("It Should not be Done", future.isDone());
+
+
+            //make sure a seconds Runnable cannot be scheduled when one is already running
+            try {
+                asyncTaskHelper.scheduleBaseRunnable(et::test
+                        , s -> {}
+                        ,initialDelayMillis
+                        ,delayMillis
+                );
+                Assert.fail("scheduling should have been prevented.  ");
+            } catch (IllegalStateException e) {
+                //IllegalStateException means the second scheduling was not allowed.
+            }
+
+
+            //let it cancel itself
+            et.cancelSelfOnNextExecution(future);
+
+            //it should be done after it executes itself one more time.
+            Assert.assertTrue("it should be done", waitFor(future::isDone, 5000));
+            Assert.assertTrue("The test failed to execute", et.isExecuted);
+        }
+
+
+    }
+
+
+    /**
+     * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not
+     * execute in the future.  Default implementation of isDone() returns true immediately after the future is
+     * canceled -- Even if is there is still a thread actively executing the runnable
+     */
     @Test
-    public void testGetCurrentAsyncTask() throws Exception {
-        Future<?> mockTask = mock(Future.class);
-        Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask);
-        Assert.assertEquals("Should return mock task", mockTask, asyncTaskHelper.getCurrentAsyncTask());
+    public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{
+
+
+        final ExecuteTest et = new ExecuteTest();
+
+        //configure test to run long and ignore interrupt
+        et.isContinuous = true;
+        et.isIgnoreInterrupt = true;
+
+
+
+        Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
+                et::test
+                , s->{}
+                ,initialDelayMillis
+                ,delayMillis
+        );
+
+        //make sure it is running
+        Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
+        Assert.assertTrue("It should be running",et.waitForTestExec(1000));
+        Assert.assertFalse("It Should not be Done", future.isDone());
+
+        //cancel it and make sure it is still running
+        future.cancel(true);
+        Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
+        Assert.assertTrue("It should be running",et.waitForTestExec(1000));
+        Assert.assertFalse("It Should not be Done", future.isDone());
+
+        //let the thread die and then make sure its done
+        et.isContinuous = false;
+        Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000));
+        Assert.assertTrue("It Should be Done", future.isDone());
+
     }
 
+
+
+
+    /**
+     * Make sure the base Future.isDone returns false until the sub callable has completed execution.
+     */
     @Test
-    public void testScheduleAsyncTaskWithMmod() throws Exception {
-        // test maintenance mode
-        ScheduledFuture<?> mockTask0 = mock(ScheduledFuture.class);
-        Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask0);
+    public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{
+
+
+        final ExecuteTest baseET = new ExecuteTest();
+        final ExecuteTest subET = new ExecuteTest();
+
+        //configure sub test to run long and ignore interrupt
+        subET.isContinuous = true;
+        subET.isIgnoreInterrupt = true;
+
+
+        //schedule the Base test to run and make sure it is running.
+        Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
+                baseET::test
+                ,s->{}
+                ,initialDelayMillis
+                ,delayMillis
+                );
+        Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000));
+        Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
+
+
+        //schedule the sub task and make sure it is  running
+        Future<?> subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test);
+        Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
+        Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
+        Assert.assertFalse("subET Should not be Done", subFuture.isDone());
+        Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
+
+        //cancel the base task and make sure isDone is still false
+        baseFuture.cancel(true);
+        Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
+        Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
+        Assert.assertFalse("subET Should not be Done",subFuture.isDone());
+        Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
+
+
+        //let the sub task die and and make sure the base is now finally done
+        subET.isContinuous = false;
+        Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000));
+        Assert.assertTrue("subET Should be Done", subFuture.isDone());
+        Assert.assertTrue("baseET Should be Done", baseFuture.isDone());
 
-        ScheduledFuture<?> mockTask1 = mock(ScheduledFuture.class);
-        Mockito.doReturn(mockTask1).when(mockScheduler).scheduleWithFixedDelay(
-                mockRunnable, asyncTaskHelper.MMODE_TASK_DELAY,
-                asyncTaskHelper.MMODE_TASK_DELAY, TimeUnit.MILLISECONDS);
-        asyncTaskHelper.scheduleAsyncTask(AppcOam.RPC.maintenance_mode, mockRunnable);
-        Mockito.verify(mockTask0, times(1)).cancel(true);
-        Assert.assertEquals(mockTask1, asyncTaskHelper.scheduleAsyncTask(AppcOam.RPC.maintenance_mode, mockRunnable));
-        Assert.assertEquals("Should set backgroundOamTask", mockTask1, asyncTaskHelper.getCurrentAsyncTask());
     }
 
+
+    /**
+     * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution.
+     * Each sub callable will be shutdown one at a time.
+     */
     @Test
-    public void testScheduleAsyncTaskWithStart() throws Exception {
-        for (AppcOam.RPC rpc : Arrays.asList(AppcOam.RPC.start, AppcOam.RPC.stop, AppcOam.RPC.restart)) {
-            runTest(rpc);
+    public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception {
+
+
+        //loop is to test we can run consecutive Base Runnable
+        for (int testIteration = 0; testIteration < 3; testIteration++) {
+            final ExecuteTest baseET = new ExecuteTest();
+            final LinkedList<Sub> subList = new LinkedList<>();
+            for (int i = 0; i < 3; i++) {
+                Sub sub = new Sub();
+                sub.et.isContinuous = true;
+                subList.add(sub);
+            }
+
+
+            //schedule the base runnable and make sure it is running
+            Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
+                    baseET::test
+                    , s -> {
+                    }
+                    , initialDelayMillis
+                    , delayMillis
+            );
+            Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000));
+            Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
+
+
+            //schedule the sub Callables and make sure these are running
+            subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test));
+            for (Sub sub : subList) {
+                Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100));
+                Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
+                Assert.assertFalse("subET Should not be Done", sub.future.isDone());
+            }
+            Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
+
+
+            //On each iteration shut down a sub callable.  Make sure it stops, the others are still running and the
+            // //base is still running.
+            while (!subList.isEmpty()) {
+
+                //stop one sub and make sure it stopped
+                {
+                    Sub sub = subList.removeFirst();
+                    Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
+                    sub.et.isContinuous = false;
+                    Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000));
+                    Assert.assertTrue("subET Should not be Done", sub.future.isDone());
+                }
+
+                //make sure the other are still running
+                for (Sub sub : subList) {
+                    Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
+                    Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
+                    Assert.assertFalse("subET Should not be Done", sub.future.isDone());
+                }
+
+                //Make sure the Base is still running
+                Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
+            }
+
+            //let the base cancel itself and make sure it stops
+            baseET.cancelSelfOnNextExecution(baseFuture);
+            Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000));
+        }
+    }
+
+
+    /**
+     * Make sure SubCallable cannot be scheduled when there is not BaseRunnable
+     */
+    @Test(expected=IllegalStateException.class)
+    public void test_SubTasksScheduleFailWhenNoBase() throws Exception {
+        asyncTaskHelper.submitBaseSubCallable(()->null);
+    }
+
+
+
+    /**
+     * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running.
+     */
+    @Test(expected=IllegalStateException.class)
+    public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception {
+
+        final ExecuteTest et = new ExecuteTest();
+        et.isContinuous = true;
+
+        Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
+                et::test
+                , s -> { }
+                ,initialDelayMillis
+                ,delayMillis
+        );
+
+        Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
+        future.cancel(false);
+        Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
+
+        try {
+            asyncTaskHelper.submitBaseSubCallable(() -> null);
+        } finally {
+            et.isContinuous = false;
         }
+
+
+
     }
 
-    private void runTest(AppcOam.RPC rpc) {
-        ScheduledFuture<?> mockTask0 = mock(ScheduledFuture.class);
-        Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask0);
-        BaseActionRunnable mockRunnable0 = mock(BaseActionRunnable.class);
-        Whitebox.setInternalState(asyncTaskHelper, "taskRunnable", mockRunnable0);
 
-        ScheduledFuture<?> mockTask2 = mock(ScheduledFuture.class);
-        Mockito.doReturn(mockTask2).when(mockScheduler).scheduleWithFixedDelay(
-                mockRunnable, asyncTaskHelper.COMMON_INITIAL_DELAY,
-                asyncTaskHelper.COMMON_INTERVAL, TimeUnit.MILLISECONDS);
-        asyncTaskHelper.scheduleAsyncTask(rpc, mockRunnable);
-        Mockito.verify(mockTask0, times(1)).cancel(true);
-        Mockito.verify(mockRunnable0, times(1)).abortRunnable(rpc);
-        Assert.assertEquals(mockTask2, asyncTaskHelper.scheduleAsyncTask(rpc, mockRunnable));
-        Assert.assertEquals("Should set backgroundOamTask", mockTask2, asyncTaskHelper.getCurrentAsyncTask());
+    /**
+     * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed
+     */
+    @Test(expected=IllegalStateException.class)
+    public void test_SubTasksScheduleFailAfterBaseDone() throws Exception {
+
+        final ExecuteTest et = new ExecuteTest();
+
+        Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
+                et::test
+                , s -> { }
+                ,initialDelayMillis
+                ,delayMillis
+        );
+
+
+        future.cancel(false);
+        Assert.assertTrue("It should not be running",waitFor(future::isDone,1000));
+
+        try {
+            asyncTaskHelper.submitBaseSubCallable(() -> null);
+        } finally {
+            et.isContinuous = false;
+        }
+
     }
 
+
+    /**
+     * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
+     * Test cancel does not block when BaseRunnable is not scheduled
+     */
     @Test
-    public void testCancelAsyncTask() throws Exception {
-        Future<?> mockTask = mock(Future.class);
-        Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask);
-        asyncTaskHelper.cancelAsyncTask(mockTask);
-        Mockito.verify(mockTask, times(1)).cancel(false);
-        Assert.assertTrue("Should have reset backgroundOamTask",
-                asyncTaskHelper.getCurrentAsyncTask() == null);
-
-
-        Whitebox.setInternalState(asyncTaskHelper, "backgroundOamTask", mockTask);
-        Future<?> mockTask2 = mock(Future.class);
-        asyncTaskHelper.cancelAsyncTask(mockTask2);
-        Mockito.verify(mockTask2, times(1)).cancel(false);
-        Assert.assertEquals("Should not reset backgroundOamTask",
-                mockTask, asyncTaskHelper.getCurrentAsyncTask());
+    public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{
+        //nothing is running so this should return immediately without TimeoutException
+        asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS);
+    }
+
+
+
+    /**
+     * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
+     * Test cancel does blocks until BaseRunnable is done scheduled
+     */
+    @Test()
+    public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception {
+
+
+        final ExecuteTest et = new ExecuteTest();
+        et.isContinuous = true;
+        et.isIgnoreInterrupt = true;
+        asyncTaskHelper.scheduleBaseRunnable(
+                et::test
+                , s -> {
+                }
+                , initialDelayMillis
+                , delayMillis
+        );
+
+        Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
+
+
+        //we should get a timeout
+        try {
+            asyncTaskHelper.cancelBaseActionRunnable(
+                    AppcOam.RPC.stop,
+                    AppcOamStates.Started,
+                    1,
+                    TimeUnit.MILLISECONDS);
+            Assert.fail("Should have gotten TimeoutException");
+        } catch (TimeoutException e) {
+            //just ignore as it is expected
+        }
+
+
+        //release the test thread
+        et.isContinuous = false;
+
+
+        //we should not get a timeout
+        asyncTaskHelper.cancelBaseActionRunnable(
+                AppcOam.RPC.stop,
+                AppcOamStates.Started,
+                1000,
+                TimeUnit.MILLISECONDS);
+
+    }
+
+
+
+    /**
+     * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
+     * Test cancel does not block when BaseRunnable is not scheduled
+     */
+    @Test
+    public void test_BaseRunnableCancelCallback() throws Exception{
+
+        AtomicReference<AppcOam.RPC> cancelCallback = new AtomicReference<>(null);
+
+        final ExecuteTest et = new ExecuteTest();
+        et.isContinuous = true;
+        Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
+                et::test
+                , cancelCallback::set
+                , initialDelayMillis
+                , delayMillis
+        );
+
+        Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
+        Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000));
+
+
+        try {
+            asyncTaskHelper.cancelBaseActionRunnable(
+                    AppcOam.RPC.stop,
+                    AppcOamStates.Started,
+                    1,
+                    TimeUnit.MILLISECONDS);
+            Assert.fail("Should have gotten TimeoutException");
+        } catch (TimeoutException e) {
+           //just ignore as it is expected
+        }
+
+
+        Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get());
+    }
+
+
+
+
+
+
+
+
+    /**
+     * @return true if the negation of the expected value is returned from the supplier within the specified
+     * amount of time
+     */
+    private static boolean waitForNot(Supplier<Boolean> s,long timeoutMillis)throws Exception{
+        return waitFor(()->!s.get(),timeoutMillis);
+    }
+
+
+    /**
+     * @return true if the expected value is returned from the supplier within the specified
+     * amount of time
+     */
+    private static boolean waitFor(Supplier<Boolean> s,long timeoutMillis) throws Exception {
+        long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis);
+        long expiryTime = System.currentTimeMillis() + timeout;
+        long elapsedTime;
+        while(!s.get()){
+            elapsedTime = expiryTime - System.currentTimeMillis();
+            if(elapsedTime < 1) {
+                break;
+            }
+            Thread.sleep(10);
+        }
+        return s.get();
+    }
+
+
+    /**
+     * This class is used control a thread  executed in th {@link #test()}
+     */
+    @SuppressWarnings("unused")
+    private static class ExecuteTest {
+
+
+        /** A fail safe to insure this TEst does not run indefinitely */
+        private final long EXPIRY_TIME = System.currentTimeMillis() + 10000;
+
+
+
+        /** A thread sets this value to true when it has completed the execution the of executes {@link #test()}   */
+        private  volatile boolean isExecuted = false;
+
+        /**
+         * A thread sets this value to true when it is actively executing {@link #test()} and back to false when
+         * it is not
+         */
+        private  volatile boolean isExecuting = false;
+
+        /**
+         * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate  a
+         * long execution.
+         */
+        private  volatile boolean isContinuous = false;
+
+        /**
+         * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force
+         * to abort via a  {@link Thread#interrupt()}
+         */
+        private  volatile boolean isIgnoreInterrupt = false;
+
+
+
+        /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */
+        private  Semaphore inner = new Semaphore(0);
+
+        /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */
+        private  Semaphore outer = new Semaphore(0);
+
+        /** The {@link Future} of the Thread executing {@link #test()}*/
+        private volatile Future<?> future;
+
+        /**
+         * When set the Thread executing {@link #test()} will cancel itself
+         * @param future - The {@link Future} of the Thread executing {@link #test()}
+         */
+        private void cancelSelfOnNextExecution(Future<?> future) {
+            this.future = future;
+        }
+
+
+        private boolean isExecuted() {
+            return isExecuted;
+        }
+
+        private boolean isExecuting() {
+            return isExecuting;
+        }
+
+
+        private boolean isContinuous() {
+            return isContinuous;
+        }
+
+
+        private boolean isIgnoreInterrupt() {
+            return isIgnoreInterrupt;
+        }
+
+
+
+        /**
+         * The thread executing this method if blocked from returning until the thread executing
+         * {@link #test()}  invokes  {@link #notifyTestExcuted(long)} or the specified time elapses
+         * @param timeoutMillis - the amount of time to wait for a execution iteration.
+         * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)}
+         * @throws InterruptedException - If the Caller thread is interrupted.
+         */
+        private boolean waitForTestExec(long timeoutMillis) throws InterruptedException {
+            inner.release();
+            return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
+        }
+
+
+        /**
+         * Test simulator
+         * @return  Always returns true.
+         */
+        private Boolean test() {
+            isTestExpired();
+            System.out.println("started");
+             isExecuting = true;
+             try {
+                 if (future != null) {
+                     future.cancel(false);
+                 }
+                 if(!isContinuous){
+                     notifyTestExcuted(1);
+                 }
+
+                 while(isContinuous){
+                     notifyTestExcuted(100);
+                     isTestExpired();
+                 }
+
+             } finally {
+                 isExecuting = false;
+                 isExecuted = true;
+             }
+             return true;
+        }
+
+
+        /** @throws RuntimeException if the test  has bee running too long */
+        private void isTestExpired(){
+            if(System.currentTimeMillis() > EXPIRY_TIME){
+                throw new RuntimeException("Something went wrong the test expired.");
+            }
+        }
+
+
+        /**
+         * The thread executing {@link #test()}  if blocked from returning until another thread invokes
+         * {@link #waitForTestExec(long)} or the specified time elapses
+         * @param timeoutMillis - the amount of time to wait for a execution iteration.
+         * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)}
+         */
+        private boolean notifyTestExcuted(long timeoutMillis){
+            try {
+                boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
+                if(acquire){
+                    outer.release();
+                    System.out.println("release");
+                }
+            } catch (InterruptedException e) {
+                if(!isIgnoreInterrupt){
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    static class Sub {
+        ExecuteTest et = new ExecuteTest();
+        Future<?> future = null;
     }
 
 }
index 2edc285..878d15f 100644 (file)
@@ -80,7 +80,7 @@ public class BundleHelperTest {
         mapFromGetAppcLcmBundles.put("BundleString", mockBundle);
 
         PowerMockito.doReturn(mapFromGetAppcLcmBundles).when(bundleHelper, MemberMatcher.method(
-                BundleHelper.class, "getAppcLcmBundles")).withNoArguments();
+            BundleHelper.class, "getAppcLcmBundles")).withNoArguments();
 
         StateHelper mockStateHelper = mock(StateHelper.class);
         Whitebox.setInternalState(bundleHelper, "stateHelper", mockStateHelper);
@@ -90,25 +90,25 @@ public class BundleHelperTest {
 
         // test start
         Mockito.doReturn(true).when(mockStateHelper).isSameState(appcOamStates);
-        boolean result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper);
+        boolean result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper,null);
         Assert.assertTrue("Should be completed", result);
-        Mockito.verify(mockTaskHelper, times(1)).submitBundleLcOperation(any());
+        Mockito.verify(mockTaskHelper, times(1)).submitBaseSubCallable(any());
 
         // test start aborted
         Mockito.doReturn(false).when(mockStateHelper).isSameState(appcOamStates);
-        result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper);
+        result = bundleHelper.bundleOperations(AppcOam.RPC.start, new HashMap<>(), mockTaskHelper,null);
         Assert.assertFalse("Should be abort", result);
-        Mockito.verify(mockTaskHelper, times(1)).submitBundleLcOperation(any());
+        Mockito.verify(mockTaskHelper, times(1)).submitBaseSubCallable(any());
 
         // test stop
-        result = bundleHelper.bundleOperations(AppcOam.RPC.stop, new HashMap<>(), mockTaskHelper);
+        result = bundleHelper.bundleOperations(AppcOam.RPC.stop, new HashMap<>(), mockTaskHelper,null);
         Assert.assertTrue("Should be completed", result);
-        Mockito.verify(mockTaskHelper, times(2)).submitBundleLcOperation(any());
+        Mockito.verify(mockTaskHelper, times(2)).submitBaseSubCallable(any());
     }
 
     @Test(expected = APPCException.class)
     public void testBundleOperationsRpcException() throws Exception {
-        bundleHelper.bundleOperations(AppcOam.RPC.maintenance_mode, new HashMap<>(), mockTaskHelper);
+        bundleHelper.bundleOperations(AppcOam.RPC.maintenance_mode, new HashMap<>(), mockTaskHelper,null);
     }
 
     @Test
@@ -156,7 +156,7 @@ public class BundleHelperTest {
         Mockito.doReturn(null).when(fakeConf).getProperty(propKey);
         String[] propResult = bundleHelper.readPropsFromPropListName(propKey);
         Assert.assertArrayEquals("PropertyResult should be empty string array",
-                ArrayUtils.EMPTY_STRING_ARRAY, propResult);
+            ArrayUtils.EMPTY_STRING_ARRAY, propResult);
         // Property has one entry
         String propValue1 = "1234";
         String propValue2 = "5678";