Enhance TestTimeMulti 92/93892/6
authorJim Hahn <jrh3@att.com>
Mon, 19 Aug 2019 21:32:09 +0000 (17:32 -0400)
committerJim Hahn <jrh3@att.com>
Wed, 21 Aug 2019 17:49:01 +0000 (13:49 -0400)
Enhance TestTimeMulti to support execution of tasks, whether
submitted via Timers or via Executors.

Change-Id: Ib5f216730b3b69028e9581052645370b827cd446
Issue-ID: POLICY-1968
Signed-off-by: Jim Hahn <jrh3@att.com>
19 files changed:
utils-test/pom.xml
utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java [new file with mode: 0644]
utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java [new file with mode: 0644]
utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java [new file with mode: 0644]
utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java [new file with mode: 0644]
utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java [new file with mode: 0644]
utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java [new file with mode: 0644]
utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java [new file with mode: 0644]
utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java [new file with mode: 0644]
utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java [new file with mode: 0644]
utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java [new file with mode: 0644]
utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java [new file with mode: 0644]
utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java [new file with mode: 0644]
utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java [new file with mode: 0644]
utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java [new file with mode: 0644]

index 6ed4925..3744467 100644 (file)
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-api-mockito</artifactId>
             <scope>test</scope>
+        </dependency>
+         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.openpojo</groupId>
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java
new file mode 100644 (file)
index 0000000..79d2f22
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import org.onap.policy.common.utils.time.TestTime;
+
+/**
+ * Work item that runs periodically.
+ */
+class PeriodicItem extends RunnableItem {
+
+    /**
+     * Time, in milliseconds, to wait between executions.
+     */
+    private final long periodMs;
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param currentTime time with which this item is associated
+     * @param associate object with which this item is associated (e.g., Timer)
+     * @param delayMs time, in milliseconds, before this item should be executed
+     * @param periodMs time, in milliseconds, to delay between each execution
+     * @param action action to be performed
+     */
+    public PeriodicItem(TestTime currentTime, Object associate, long delayMs, long periodMs, Runnable action) {
+        super(currentTime, associate, delayMs, action);
+
+        if (periodMs <= 0) {
+            throw new IllegalArgumentException("invalid period " + periodMs);
+        }
+
+        this.periodMs = periodMs;
+    }
+
+    @Override
+    public boolean bumpNextTime() {
+        bumpNextTime(periodMs);
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "PeriodicItem [nextMs=" + getNextMs() + ", periodMs=" + periodMs + ", associate=" + getAssociate() + "]";
+    }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java
new file mode 100644 (file)
index 0000000..4f9b32c
--- /dev/null
@@ -0,0 +1,186 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Scheduled executor service that uses {@link TestTimeMulti} to execute its tasks. Note:
+ * the invokeXxx() methods are not currently supported.
+ */
+public class PseudoScheduledExecutorService implements ScheduledExecutorService {
+    private static final String NOT_IMPLEMENTED_YET = "not implemented yet";
+
+    /**
+     * Object to be used to execute timer tasks.
+     */
+    private final TestTimeMulti currentTime;
+
+    /**
+     * {@code True} if {@link #shutdown()} or {@link #shutdownNow()} has been called,
+     * {@code false} otherwise.
+     */
+    private boolean shutdown = false;
+
+    /**
+     * Constructs the object.
+     *
+     * @param currentTime object to be used to execute timer tasks
+     */
+    public PseudoScheduledExecutorService(TestTimeMulti currentTime) {
+        this.currentTime = currentTime;
+    }
+
+    /**
+     * Cancels <i>all</i> tasks that have not yet been executed.
+     */
+    @Override
+    public void shutdown() {
+        shutdown = true;
+        currentTime.cancelItems(this);
+    }
+
+    /**
+     * Cancels <i>all</i> tasks that have not yet been executed. Does <i>not</i> interrupt
+     * any currently executing task.
+     */
+    @Override
+    public List<Runnable> shutdownNow() {
+        shutdown = true;
+        return currentTime.cancelItems(this).stream().map(item -> ((RunnableItem) item).getAction())
+                        .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return shutdown;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return shutdown;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        return shutdown;
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        return enqueueRunOnce(0, new FutureTask<>(task));
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        return enqueueRunOnce(0, new FutureTask<>(task, result));
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        return enqueueRunOnce(0, new FutureTask<>(task, null));
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                    throws InterruptedException {
+        throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        currentTime.enqueue(new RunnableItem(currentTime, this, 0, command));
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+        return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(command, null, false));
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+        return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(callable, false));
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+        return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(period),
+                        new PseudoScheduledFuture<>(command, null, true));
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(delay),
+                        new PseudoScheduledFuture<>(command, null, true));
+    }
+
+    /**
+     * Enqueues a future to be executed one time.
+     *
+     * @param delay delay until the future should be executed
+     * @param future future to be enqueued
+     * @return the future
+     */
+    private <F extends FutureTask<T>, T> F enqueueRunOnce(long delay, F future) {
+        currentTime.enqueue(new RunnableItem(currentTime, this, delay, future));
+        return future;
+    }
+
+    /**
+     * Enqueues a future to be executed periodically.
+     *
+     * @param initialDelayMs delay until the future should be executed the first time
+     * @param periodMs delay between executions of the future
+     * @param future future to be enqueued
+     * @return the future
+     */
+    private <T> ScheduledFuture<T> enqueuePeriodic(long initialDelayMs, long periodMs,
+                    PseudoScheduledFuture<T> future) {
+        currentTime.enqueue(new PeriodicItem(currentTime, this, initialDelayMs, periodMs, future));
+        return future;
+    }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java
new file mode 100644 (file)
index 0000000..6ce7bc0
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Scheduled future that gets its time from an associated work item.
+ *
+ * @param <T> type of result returned by the future
+ */
+class PseudoScheduledFuture<T> extends FutureTask<T> implements RunnableScheduledFuture<T> {
+
+    /**
+     * {@code True} if this task is periodic, {@code false} otherwise.
+     */
+    private final boolean periodic;
+
+    /**
+     * The work item with which this is associated.
+     */
+    @Getter(AccessLevel.PROTECTED)
+    @Setter(AccessLevel.PROTECTED)
+    private WorkItem workItem;
+
+    /**
+     * Constructs the object.
+     *
+     * @param runnable action to be executed
+     * @param result value to be returned by the {@link #get()} operation
+     * @param periodic {@code true} if this task is periodic, {@code false} otherwise
+     */
+    public PseudoScheduledFuture(Runnable runnable, T result, boolean periodic) {
+        super(runnable, result);
+        this.periodic = periodic;
+    }
+
+    /**
+     * Constructs the object.
+     *
+     * @param callable action to be executed
+     * @param periodic {@code true} if this task is periodic, {@code false} otherwise
+     */
+    public PseudoScheduledFuture(Callable<T> callable, boolean periodic) {
+        super(callable);
+        this.periodic = periodic;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return unit.convert(workItem.getDelay(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+        return Long.compare(workItem.getDelay(), other.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean isPeriodic() {
+        return periodic;
+    }
+
+    @Override
+    public void run() {
+        if (isPeriodic()) {
+            super.runAndReset();
+
+        } else {
+            super.run();
+        }
+    }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java
new file mode 100644 (file)
index 0000000..e8de89a
--- /dev/null
@@ -0,0 +1,100 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * A timer that uses {@link TestTimeMulti} to execute its tasks.
+ *
+ * <p/>Note: this only supports the run() method of {@link TimerTask}; the other methods,
+ * including cancel() are not supported.  However, tasks may be canceled via
+ * {@link Timer#cancel()}.
+ *
+ * <p/>Currently, this does not support any of the scheduling methods that take dates,
+ * though that could be added relatively easily.
+ */
+public class PseudoTimer extends Timer {
+    private static final String NOT_IMPLEMENTED_YET = "not implemented yet";
+
+    /**
+     * Time with which this item is associated.
+     */
+    private final TestTimeMulti currentTime;
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param currentTime object to be used to execute timer tasks
+     */
+    public PseudoTimer(TestTimeMulti currentTime) {
+        // create as a daemon so jvm doesn't hang when it attempts to exit
+        super(true);
+
+        this.currentTime = currentTime;
+
+        // don't need the timer's thread
+        super.cancel();
+    }
+
+    @Override
+    public void schedule(TimerTask task, long delayMs) {
+        currentTime.enqueue(new RunnableItem(currentTime, this, delayMs, task));
+    }
+
+    @Override
+    public void schedule(TimerTask task, Date time) {
+        throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+    }
+
+    @Override
+    public void schedule(TimerTask task, long delayMs, long periodMs) {
+        currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task));
+    }
+
+    @Override
+    public void schedule(TimerTask task, Date firstTime, long period) {
+        throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+    }
+
+    @Override
+    public void scheduleAtFixedRate(TimerTask task, long delayMs, long periodMs) {
+        currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task));
+    }
+
+    @Override
+    public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
+        throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+    }
+
+    @Override
+    public void cancel() {
+        currentTime.cancelItems(this);
+    }
+
+    @Override
+    public int purge() {
+        return 0;
+    }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java
new file mode 100644 (file)
index 0000000..5456031
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.concurrent.Future;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.common.utils.time.TestTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Work item that may be run/executed.
+ */
+class RunnableItem extends WorkItem {
+    private static final Logger logger = LoggerFactory.getLogger(RunnableItem.class);
+
+    /**
+     * Object with which this item is associated.
+     */
+    @Getter(AccessLevel.PROTECTED)
+    private final Object associate;
+
+    /**
+     * Action to execute.
+     */
+    @Getter(AccessLevel.PROTECTED)
+    private final Runnable action;
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param currentTime time with which this item is associated
+     * @param associate object with which this item is associated (e.g., Timer)
+     * @param delayMs time, in milliseconds, before this item should be executed
+     * @param action action to be performed
+     */
+    public RunnableItem(TestTime currentTime, Object associate, long delayMs, Runnable action) {
+        super(currentTime, delayMs);
+        this.associate = associate;
+        this.action = action;
+
+        // ensure the task can properly compute its delay
+        if (action instanceof PseudoScheduledFuture) {
+            ((PseudoScheduledFuture<?>) action).setWorkItem(this);
+        }
+    }
+
+    @Override
+    public boolean isAssociatedWith(Object associate) {
+        return (this.associate == associate);
+    }
+
+    @Override
+    public boolean wasCancelled() {
+        return (action instanceof Future && ((Future<?>) action).isCancelled());
+    }
+
+    @Override
+    public void fire() {
+        try {
+            action.run();
+        } catch (RuntimeException e) {
+            logger.warn("work item {} threw an exception {}", this, e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "RunnableItem [nextMs=" + getNextMs() + ", associate=" + associate + "]";
+    }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java
new file mode 100644 (file)
index 0000000..1d31880
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Work item used when a thread invokes sleep(). The thread's "sleep()" method will
+ * enqueue this item and then invoke {@link #await()} to wait for the test/controlling
+ * thread to fire it, indicating that the end of the sleep time has been reached.
+ */
+public class SleepItem extends WorkItem {
+    /**
+     * Thread that invoked "sleep()".
+     */
+    private final Thread thread;
+
+    /**
+     * This will be decremented when this work item is fired, thus releasing the
+     * "sleeping" thread to continue its work.
+     */
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param currentTime time with which this item is associated
+     * @param sleepMs time for which the thread should sleep
+     * @param thread thread that invoked "sleep()"
+     */
+    public SleepItem(TestTime currentTime, long sleepMs, Thread thread) {
+        super(currentTime, sleepMs);
+        this.thread = thread;
+    }
+
+    @Override
+    public void interrupt() {
+        thread.interrupt();
+    }
+
+    @Override
+    public void fire() {
+        latch.countDown();
+    }
+
+    /**
+     * Waits for the sleep time to be reached.
+     *
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public void await() throws InterruptedException {
+        latch.await();
+    }
+
+    @Override
+    public String toString() {
+        return "SleepItem [nextMs=" + getNextMs() + ", latch=" + latch + ", thread=" + thread + "]";
+    }
+}
index 414c18b..420021f 100644 (file)
@@ -2,14 +2,14 @@
  * ============LICENSE_START=======================================================
  * Common Utils-Test
  * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -37,7 +37,7 @@ public class TestTime extends CurrentTime {
 
     /**
      *  Constructor.
-     *  
+     *
      */
     public TestTime() {
         super();
@@ -55,6 +55,8 @@ public class TestTime extends CurrentTime {
 
     @Override
     public void sleep(long sleepMs) throws InterruptedException {
-        tcur.addAndGet(sleepMs);
+        if (sleepMs > 0) {
+            tcur.addAndGet(sleepMs);
+        }
     }
 }
index b37e49e..f52105e 100644 (file)
@@ -1,6 +1,6 @@
-/*
+/*-
  * ============LICENSE_START=======================================================
- * Common Utils-Test
+ * ONAP
  * ================================================================================
  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
 
 package org.onap.policy.common.utils.time;
 
-import java.util.Date;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.PriorityQueue;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.onap.policy.common.utils.time.TestTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * "Current" time, when running junit tests in multiple threads. This is intended to be
  * injected into classes under test, to replace their {@link CurrentTime} objects. The
- * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion
- * of "current" time forward, allowing threads to resume, as the end of their sleep time
- * is reached. Additional threads do not resume until all threads have once again entered
- * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a
- * thread will not re-enter {@link #sleep(long)}.
+ * {@link #sleep(long)} method blocks until the "time" has reached the specified sleep
+ * time. A queue of work items is maintained, sorted by the time for which the items are
+ * scheduled to execute. Tasks are executed by the test/controlling thread when one of the
+ * waitXxx() methods is invoked. {@link PseudoTimer} and
+ * {@link PseudoScheduledExecutorService} add work items to the queue.
+ *
+ * <p/>
+ * This only handles relatively simple situations, though it does support multi-threaded
+ * testing.
  */
-public class TestTimeMulti extends CurrentTime {
+public class TestTimeMulti extends TestTime {
+    private static final Logger logger = LoggerFactory.getLogger(TestTimeMulti.class);
+
+    public static final String NEVER_SATISFIED = "condition was never satisfied";
+
+    /**
+     * Default value, in milliseconds, to wait for an item to be added to the queue.
+     */
+    public static final long DEFAULT_MAX_WAIT_MS = 5000L;
 
     /**
-     * Number of threads that will be sleeping simultaneously.
+     * Maximum time that the test thread should wait for something to be added to its work
+     * queue.
      */
-    private int nthreads;
+    @Getter
+    private final long maxWaitMs;
 
     /**
-     * "Current" time, in milliseconds, used by tests.
+     * Queue of timer tasks to be executed, sorted by {@link WorkItem#nextMs}.
      */
-    private long tcur = System.currentTimeMillis();
+    private final PriorityQueue<WorkItem> queue =
+                    new PriorityQueue<>((item1, item2) -> Long.compare(item1.getNextMs(), item2.getNextMs()));
 
     /**
-     * Queue of sleeping threads waiting to be awakened.
+     * Lock used when modifying the queue.
      */
-    private final PriorityQueue<Info> queue = new PriorityQueue<>();
+    private final Object updateLock = new Object();
 
     /**
-     * Used to synchronize updates.
+     * Constructs the object using the default maximum wait time.
      */
-    private final Object locker = new Object();
+    public TestTimeMulti() {
+        this(DEFAULT_MAX_WAIT_MS);
+    }
 
     /**
-     * Constructor.
+     * Constructs the object.
      *
-     * @param nthreads number of threads that will be sleeping simultaneously
+     * @param maxWaitMs maximum time that the test thread should wait for something to be
+     *        added to its work queue
      */
-    public TestTimeMulti(int nthreads) {
-        this.nthreads = nthreads;
+    public TestTimeMulti(long maxWaitMs) {
+        this.maxWaitMs = maxWaitMs;
     }
 
-    @Override
-    public long getMillis() {
-        return tcur;
+    /**
+     * Determines if the task queue is empty.
+     *
+     * @return {@code true} if the task queue is empty, {@code false} otherwise
+     */
+    public boolean isEmpty() {
+        synchronized (updateLock) {
+            purgeItems();
+            return queue.isEmpty();
+        }
     }
 
-    @Override
-    public Date getDate() {
-        return new Date(tcur);
+    /**
+     * Gets the number of tasks in the work queue.
+     *
+     * @return the number of tasks in the work queue
+     */
+    public int queueLength() {
+        synchronized (updateLock) {
+            purgeItems();
+            return queue.size();
+        }
     }
 
-    @Override
-    public void sleep(long sleepMs) throws InterruptedException {
-        if (sleepMs <= 0) {
-            return;
+    /**
+     * Indicates that this will no longer be used. Interrupts any threads that are waiting
+     * for their "sleep()" to complete.
+     */
+    public void destroy() {
+        synchronized (updateLock) {
+            queue.forEach(WorkItem::interrupt);
+            queue.clear();
         }
+    }
 
-        Info info = new Info(tcur + sleepMs);
+    /**
+     * Runs a single task from the queue.
+     *
+     * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather
+     *        than pseudo time
+     *
+     * @return {@code true} if a task was run, {@code false} if the queue was empty
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public boolean runOneTask(long waitMs) throws InterruptedException {
+        WorkItem item = pollQueue(waitMs);
+        if (item == null) {
+            return false;
+        }
 
-        synchronized (locker) {
-            queue.add(info);
+        runItem(item);
+        return true;
+    }
 
-            if (queue.size() == nthreads) {
-                // all threads are now sleeping - wake one up
-                wakeThreads();
+    /**
+     * Waits for the pseudo time to reach a certain point. Executes work items until the
+     * time is reached.
+     *
+     * @param waitMs pseudo time, in milliseconds, for which to wait
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public void waitFor(long waitMs) throws InterruptedException {
+        // pseudo time for which we're waiting
+        long tend = getMillis() + waitMs;
+
+        while (getMillis() < tend) {
+            if (!runOneTask(maxWaitMs)) {
+                /*
+                 * Waited the maximum poll time and nothing has happened, so we'll just
+                 * bump the time directly.
+                 */
+                super.sleep(tend - getMillis());
+                break;
             }
         }
-
-        // this MUST happen outside of the "synchronized" block
-        info.await();
     }
 
     /**
-     * Indicates that a thread has terminated or that it will no longer be invoking
-     * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after
-     * removing the terminated thread.
+     * Waits for a condition to become true. Executes work items until the given condition
+     * is true.
      *
-     * @throws IllegalStateException if the queue is already full
+     * @param condition condition to be checked
      */
-    public void threadCompleted() {
-        synchronized (locker) {
-            int sz = queue.size();
-            if (sz >= nthreads) {
-                throw new IllegalStateException("too many threads still sleeping");
-            }
+    public void waitUntil(Callable<Boolean> condition) {
+        try {
+            // real time for which we're waiting
+            long realEnd = System.currentTimeMillis() + maxWaitMs;
 
-            --nthreads;
+            while (System.currentTimeMillis() < realEnd) {
+                if (condition.call()) {
+                    return;
+                }
 
-            if (sz == nthreads) {
-                // after removing terminated thread - queue is now full; awaken something
-                wakeThreads();
+                runOneTask(100);
             }
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("interrupted while waiting for condition", e);
+            fail("interrupted while waiting for condition: " + e.getMessage());
+
+        } catch (Exception e) {
+            logger.error("condition evaluator threw an exception", e);
+            fail("condition evaluator threw an exception: " + e.getMessage());
         }
+
+        fail(NEVER_SATISFIED);
     }
 
     /**
-     * Advances the "current" time and awakens any threads sleeping until that time.
+     * Waits for a condition to become true. Executes work items until the given condition
+     * is true or the maximum wait time is reached.
+     *
+     * @param twait maximum, pseudo time to wait
+     * @param units time units represented by "twait"
+     * @param condition condition to be checked
      */
-    private void wakeThreads() {
-        Info info = queue.poll();
-        if (info == null) {
-            return;
-        }
+    public void waitUntil(long twait, TimeUnit units, Callable<Boolean> condition) {
+        // pseudo time for which we're waiting
+        long tend = getMillis() + units.toMillis(twait);
 
-        tcur = info.getAwakenAtMs();
-        info.wake();
+        waitUntil(() -> {
+            if (getMillis() >= tend) {
+                fail(NEVER_SATISFIED);
+            }
 
-        while ((info = queue.poll()) != null) {
-            if (tcur == info.getAwakenAtMs()) {
-                info.wake();
+            return condition.call();
+        });
+    }
 
-            } else {
-                // not ready to wake this thread - put it back in the queue
-                queue.add(info);
-                break;
+    /**
+     * Gets one item from the work queue.
+     *
+     * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather
+     *        than pseudo time
+     * @return the first item in the queue, or {@code null} if no item was added to the
+     *         queue before the wait time expired
+     * @throws InterruptedException if the current thread was interrupted
+     */
+    private WorkItem pollQueue(long waitMs) throws InterruptedException {
+        long realEnd = System.currentTimeMillis() + waitMs;
+        WorkItem work;
+
+        synchronized (updateLock) {
+            while ((work = queue.poll()) == null) {
+                updateLock.wait(Math.max(1, realEnd - System.currentTimeMillis()));
+
+                if (queue.isEmpty() && System.currentTimeMillis() >= realEnd) {
+                    return null;
+                }
             }
         }
+
+        return work;
     }
 
     /**
-     * Info about a sleeping thread.
+     * Runs a work item.
+     *
+     * @param work work item to be run
+     * @throws InterruptedException if the current thread was interrupted
      */
-    private static class Info implements Comparable<Info> {
-
-        /**
-         * Time, in milliseconds, at which the associated thread should awaken.
-         */
-        private final long awakenAtMs;
+    private void runItem(WorkItem work) throws InterruptedException {
+        if (work.wasCancelled()) {
+            logger.info("work item was canceled {}", work);
+            return;
+        }
 
-        /**
-         * This is triggered when the associated thread should awaken.
-         */
-        private final CountDownLatch latch = new CountDownLatch(1);
+        // update the pseudo time
+        super.sleep(work.getNextMs() - getMillis());
 
-        /**
-         * Constructor.
-         *
-         * @param awakenAtMs time, in milliseconds, at which the associated thread should
-         *        awaken
+        /*
+         * Add it back into the queue if appropriate, in case cancel() is called while
+         * it's executing.
          */
-        public Info(long awakenAtMs) {
-            this.awakenAtMs = awakenAtMs;
+        if (work.bumpNextTime()) {
+            logger.info("re-enqueuing work item");
+            enqueue(work);
         }
 
-        public long getAwakenAtMs() {
-            return awakenAtMs;
+        logger.info("fire work item {}", work);
+        work.fire();
+    }
+
+    @Override
+    public void sleep(long sleepMs) throws InterruptedException {
+        if (sleepMs <= 0) {
+            return;
         }
 
-        /**
-         * Awakens the associated thread by decrementing its latch.
-         */
-        public void wake() {
-            latch.countDown();
+        SleepItem item = new SleepItem(this, sleepMs, Thread.currentThread());
+        enqueue(item);
+
+        // wait for the item to fire
+        logger.info("sleeping {}", item);
+        item.await();
+        logger.info("done sleeping {}", Thread.currentThread());
+    }
+
+    /**
+     * Adds an item to the {@link #queue}.
+     *
+     * @param item item to be added
+     */
+    protected void enqueue(WorkItem item) {
+        logger.info("enqueue work item {}", item);
+        synchronized (updateLock) {
+            queue.add(item);
+            updateLock.notify();
         }
+    }
 
-        /**
-         * Blocks the current thread until awakened (i.e., until its latch is
-         * decremented).
-         *
-         * @throws InterruptedException can be interrupted
-         */
-        public void await() throws InterruptedException {
-            latch.await();
+    /**
+     * Cancels work items by removing them from the queue if they're associated with the
+     * specified object.
+     *
+     * @param associate object whose associated items are to be cancelled
+     * @return list of items that were canceled
+     */
+    protected List<WorkItem> cancelItems(Object associate) {
+        List<WorkItem> items = new LinkedList<>();
+
+        synchronized (updateLock) {
+            Iterator<WorkItem> iter = queue.iterator();
+            while (iter.hasNext()) {
+                WorkItem item = iter.next();
+                if (item.isAssociatedWith(associate)) {
+                    iter.remove();
+                    items.add(item);
+                }
+            }
         }
 
-        @Override
-        public int compareTo(Info object) {
-            int diff = Long.compare(awakenAtMs, object.awakenAtMs);
+        return items;
+    }
 
-            // this assumes that Object.toString() is unique for each Info object
-            if (diff == 0) {
-                diff = this.toString().compareTo(object.toString());
+    /**
+     * Purges work items that are known to have been canceled. (Does not remove canceled
+     * TimerTasks, as there is no way via the public API to determine if the task has been
+     * canceled.)
+     */
+    public void purgeItems() {
+        synchronized (updateLock) {
+            Iterator<WorkItem> iter = queue.iterator();
+            while (iter.hasNext()) {
+                if (iter.next().wasCancelled()) {
+                    iter.remove();
+                }
             }
-            return diff;
         }
-
     }
 }
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java
new file mode 100644 (file)
index 0000000..af3d5d7
--- /dev/null
@@ -0,0 +1,128 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.common.utils.time.TestTime;
+
+/**
+ * Work item to be executed at some time.
+ */
+class WorkItem {
+
+    /**
+     * Pseudo time with which this item is associated.
+     */
+    private final TestTime currentTime;
+
+    /**
+     * Time, in milliseconds, when the timer should fire next.
+     */
+    @Getter(AccessLevel.PROTECTED)
+    private long nextMs;
+
+
+    /**
+     * Constructs the object.
+     *
+     * @param currentTime time with which this item is associated
+     * @param delayMs time, in milliseconds, before this item should be executed
+     */
+    public WorkItem(TestTime currentTime, long delayMs) {
+        if (delayMs < 0) {
+            throw new IllegalArgumentException("invalid delay " + delayMs);
+        }
+
+        this.currentTime = currentTime;
+        bumpNextTime(delayMs);
+    }
+
+    /**
+     * Gets the delay until the item should be fired.
+     *
+     * @return the delay until the item should be fired
+     */
+    public long getDelay() {
+        return (nextMs - currentTime.getMillis());
+    }
+
+    /**
+     * Determines if this work item was canceled.
+     *
+     * @return {@code true} if this item was canceled, {@code false} otherwise
+     */
+    public boolean wasCancelled() {
+        return false;
+    }
+
+    /**
+     * Bumps {@link #nextMs}, if this is a periodic task. The default method simply
+     * returns {@code false}.
+     *
+     * @return {@code true} if the time was bumped, {@code false} otherwise (i.e., it is
+     *         not a periodic task)
+     */
+    public boolean bumpNextTime() {
+        return false;
+    }
+
+    /**
+     * Bumps {@link #nextMs}, setting it to the current time plus the given delay.
+     *
+     * @param delayMs time, in milliseconds, before this item should be (re-)executed
+     */
+    protected void bumpNextTime(long delayMs) {
+        if (delayMs < 0) {
+            throw new IllegalArgumentException("negative delay");
+        }
+
+        // always bump by at least 1 millisecond
+        this.nextMs = currentTime.getMillis() + Math.max(1, delayMs);
+    }
+
+    /**
+     * Interrupts the thread that created the work item, if appropriate. The default
+     * method does nothing.
+     */
+    public void interrupt() {
+        // do nothing
+    }
+
+    /**
+     * Determines if this item is associated with the given object. The default method
+     * simply returns {@code false}.
+     *
+     * @param associate candidate associate (e.g., Timer)
+     * @return {@code true} if the item is associated with the given object, {@code false}
+     *         otherwise
+     */
+    public boolean isAssociatedWith(Object associate) {
+        return false;
+    }
+
+    /**
+     * Fires/executes this item. The default method does nothing.
+     */
+    public void fire() {
+        // do nothing
+    }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java
new file mode 100644 (file)
index 0000000..3e64edf
--- /dev/null
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class PeriodicItemTest {
+    private static final long DELAY_MS = 100L;
+    private static final long PERIOD_MS = 200L;
+    private static final Object ASSOCIATE = new Object();
+
+    private TestTime currentTime;
+    private int count;
+    private PeriodicItem item;
+
+    /**
+     * Sets up objects, including {@link #item}.
+     */
+    @Before
+    public void setUp() {
+        currentTime = new TestTime();
+        count = 0;
+        item = new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, PERIOD_MS, () -> count++);
+    }
+
+    @Test
+    public void testBumpNextTime() {
+        assertTrue(item.bumpNextTime());
+        assertEquals(currentTime.getMillis() + PERIOD_MS, item.getNextMs());
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(item.toString());
+    }
+
+    @Test
+    public void testPeriodicItem() {
+        assertSame(ASSOCIATE, item.getAssociate());
+        assertNotNull(item.getAction());
+        assertEquals(currentTime.getMillis() + DELAY_MS, item.getNextMs());
+
+        item.getAction().run();
+        assertEquals(1, count);
+
+        // invalid period
+        assertThatIllegalArgumentException()
+            .isThrownBy(() -> new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, 0, () -> count++));
+        assertThatIllegalArgumentException()
+            .isThrownBy(() -> new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, -1, () -> count++));
+    }
+
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java
new file mode 100644 (file)
index 0000000..70820c4
--- /dev/null
@@ -0,0 +1,267 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PseudoScheduledExecutorServiceTest {
+    private static final long DELAY_MS = 100L;
+    private static final long PERIOD_MS = 200L;
+
+    private int ran;
+    private int called;
+    private TestTimeMulti currentTime;
+    private PseudoScheduledExecutorService svc;
+
+    /**
+     * Sets up objects, including {@link #svc}.
+     */
+    @Before
+    public void setUp() {
+        ran = 0;
+        called = 0;
+        currentTime = new TestTimeMulti();
+        svc = new PseudoScheduledExecutorService(currentTime);
+    }
+
+    @Test
+    public void testShutdown() {
+        // submit some tasks
+        svc.submit(new MyRun());
+        svc.schedule(new MyRun(), 1L, TimeUnit.SECONDS);
+
+        svc.shutdown();
+        assertTrue(svc.isShutdown());
+
+        // task should have been removed
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testShutdownNow() {
+        // submit some tasks
+        svc.submit(new MyRun());
+        svc.schedule(new MyRun(), 1L, TimeUnit.SECONDS);
+
+        svc.shutdownNow();
+        assertTrue(svc.isShutdown());
+
+        // task should have been removed
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testIsShutdown_testIsTerminated() {
+        assertFalse(svc.isShutdown());
+        assertFalse(svc.isTerminated());
+
+        svc.shutdown();
+        assertTrue(svc.isShutdown());
+        assertTrue(svc.isTerminated());
+    }
+
+    @Test
+    public void testAwaitTermination() throws InterruptedException {
+        assertFalse(svc.awaitTermination(1L, TimeUnit.SECONDS));
+
+        svc.shutdown();
+        assertTrue(svc.awaitTermination(1L, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testSubmitCallableOfT() throws Exception {
+        Future<Integer> future = svc.submit(new MyCallable());
+        currentTime.runOneTask(0);
+
+        assertEquals(1, called);
+        assertEquals(1, future.get().intValue());
+
+        // nothing re-queued
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testSubmitRunnableT() throws Exception {
+        Future<Integer> future = svc.submit(new MyRun(), 2);
+        currentTime.runOneTask(0);
+
+        assertEquals(1, ran);
+        assertEquals(2, future.get().intValue());
+
+        // nothing re-queued
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testSubmitRunnable() throws Exception {
+        assertNotNull(svc.submit(new MyRun()));
+        currentTime.runOneTask(0);
+
+        assertEquals(1, ran);
+
+        // nothing re-queued
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testInvokeAllCollectionOfQextendsCallableOfT() {
+        assertThatThrownBy(() -> svc.invokeAll(Collections.emptyList()))
+                        .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Test
+    public void testInvokeAllCollectionOfQextendsCallableOfTLongTimeUnit() {
+        assertThatThrownBy(() -> svc.invokeAll(Collections.emptyList(), 1, TimeUnit.MILLISECONDS))
+                        .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Test
+    public void testInvokeAnyCollectionOfQextendsCallableOfT() {
+        assertThatThrownBy(() -> svc.invokeAny(Collections.emptyList()))
+                        .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Test
+    public void testInvokeAnyCollectionOfQextendsCallableOfTLongTimeUnit() {
+        assertThatThrownBy(() -> svc.invokeAny(Collections.emptyList(), 1, TimeUnit.MILLISECONDS))
+                        .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Test
+    public void testExecute() throws InterruptedException {
+        svc.execute(new MyRun());
+        currentTime.runOneTask(0);
+
+        assertEquals(1, ran);
+
+        // nothing re-queued
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testScheduleRunnableLongTimeUnit() throws InterruptedException {
+        assertNotNull(svc.schedule(new MyRun(), DELAY_MS, TimeUnit.MILLISECONDS));
+
+        assertEquals(DELAY_MS, oneTaskElapsedTime());
+        assertEquals(1, ran);
+
+        // verify nothing re-scheduled
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testScheduleCallableOfVLongTimeUnit() throws Exception {
+        ScheduledFuture<Integer> future = svc.schedule(new MyCallable(), DELAY_MS, TimeUnit.MILLISECONDS);
+
+        assertEquals(DELAY_MS, oneTaskElapsedTime());
+        assertEquals(1, called);
+        assertEquals(1, future.get().intValue());
+
+        // verify nothing re-scheduled
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testScheduleAtFixedRate() throws InterruptedException {
+        final ScheduledFuture<?> future =
+                        svc.scheduleAtFixedRate(new MyRun(), DELAY_MS, PERIOD_MS, TimeUnit.MILLISECONDS);
+
+        assertEquals(DELAY_MS, oneTaskElapsedTime());
+        assertEquals(1, ran);
+
+        assertEquals(PERIOD_MS, oneTaskElapsedTime());
+        assertEquals(2, ran);
+
+        assertEquals(PERIOD_MS, oneTaskElapsedTime());
+        assertEquals(3, ran);
+
+        future.cancel(false);
+
+        // should not actually execute
+        assertEquals(0, oneTaskElapsedTime());
+        assertEquals(3, ran);
+
+        // verify nothing re-scheduled
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testScheduleWithFixedDelay() throws InterruptedException {
+        final ScheduledFuture<?> future =
+                        svc.scheduleWithFixedDelay(new MyRun(), DELAY_MS, PERIOD_MS, TimeUnit.MILLISECONDS);
+
+        assertEquals(DELAY_MS, oneTaskElapsedTime());
+        assertEquals(1, ran);
+
+        assertEquals(PERIOD_MS, oneTaskElapsedTime());
+        assertEquals(2, ran);
+
+        assertEquals(PERIOD_MS, oneTaskElapsedTime());
+        assertEquals(3, ran);
+
+        future.cancel(false);
+
+        // should not actually execute
+        assertEquals(0, oneTaskElapsedTime());
+        assertEquals(3, ran);
+
+        // verify nothing re-scheduled
+        assertTrue(currentTime.isEmpty());
+    }
+
+    /**
+     * Runs a single task and returns its elapsed (pseudo) time.
+     *
+     * @return the elapsed time taken to run the task
+     * @throws InterruptedException if the thread is interrupted
+     */
+    private long oneTaskElapsedTime() throws InterruptedException {
+        final long tbegin = currentTime.getMillis();
+        currentTime.runOneTask(0);
+        return (currentTime.getMillis() - tbegin);
+    }
+
+    private class MyRun implements Runnable {
+        @Override
+        public void run() {
+            ++ran;
+        }
+    }
+
+    private class MyCallable implements Callable<Integer> {
+        @Override
+        public Integer call() {
+            return ++called;
+        }
+    }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java
new file mode 100644 (file)
index 0000000..e23bbd2
--- /dev/null
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class PseudoScheduledFutureTest {
+    private static final long DELAY_MS = 1000L;
+
+    private int count;
+
+    @Mock
+    private WorkItem work;
+
+    private PseudoScheduledFuture<Integer> future;
+
+    /**
+     * Sets up objects, including {@link #future}.
+     */
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        when(work.getDelay()).thenReturn(DELAY_MS);
+
+        count = 0;
+        future = new PseudoScheduledFuture<>(() -> ++count, true);
+        future.setWorkItem(work);
+    }
+
+    @Test
+    public void testRun() {
+        // verify with a periodic task - should execute twice
+        count = 0;
+        future.run();
+        future.run();
+        assertEquals(2, count);
+
+        // verify with an aperiodic task - should only execute once
+        future = new PseudoScheduledFuture<>(() -> ++count, false);
+        count = 0;
+        future.run();
+        future.run();
+        assertEquals(1, count);
+    }
+
+    @Test
+    public void testPseudoScheduledFutureRunnableTBoolean() throws Exception {
+        final Integer result = 100;
+        future = new PseudoScheduledFuture<>(() -> ++count, result, true);
+        assertTrue(future.isPeriodic());
+        future.run();
+        future.run();
+        assertEquals(2, count);
+
+        // verify with aperiodic constructor
+        future = new PseudoScheduledFuture<>(() -> ++count, result, false);
+        count = 0;
+        assertFalse(future.isPeriodic());
+        future.run();
+        future.run();
+        assertEquals(1, count);
+        assertEquals(result, future.get());
+    }
+
+    @Test
+    public void testPseudoScheduledFutureCallableOfTBoolean() throws Exception {
+        assertTrue(future.isPeriodic());
+        future.run();
+        future.run();
+        assertEquals(2, count);
+
+        // verify with aperiodic constructor
+        future = new PseudoScheduledFuture<>(() -> ++count, false);
+        count = 0;
+        assertFalse(future.isPeriodic());
+        future.run();
+        assertEquals(1, future.get().intValue());
+        future.run();
+        assertEquals(1, count);
+    }
+
+    @Test
+    public void testGetDelay() {
+        assertEquals(DELAY_MS, future.getDelay(TimeUnit.MILLISECONDS));
+        assertEquals(TimeUnit.MILLISECONDS.toSeconds(DELAY_MS), future.getDelay(TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testCompareTo() {
+        Delayed delayed = mock(Delayed.class);
+        when(delayed.getDelay(TimeUnit.MILLISECONDS)).thenReturn(DELAY_MS + 1);
+
+        assertTrue(future.compareTo(delayed) < 0);
+    }
+
+    @Test
+    public void testIsPeriodic() {
+        assertTrue(future.isPeriodic());
+        assertFalse(new PseudoScheduledFuture<>(() -> ++count, false).isPeriodic());
+    }
+
+    @Test
+    public void testGetWorkItem() {
+        assertSame(work, future.getWorkItem());
+    }
+
+    @Test
+    public void testSetWorkItem() {
+        work = mock(WorkItem.class);
+        future.setWorkItem(work);
+        assertSame(work, future.getWorkItem());
+    }
+
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java
new file mode 100644 (file)
index 0000000..4971053
--- /dev/null
@@ -0,0 +1,141 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.TimerTask;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PseudoTimerTest {
+    private static final long DELAY_MS = 1000L;
+    private static final long PERIOD_MS = 2000L;
+
+    private int count;
+    private TestTimeMulti currentTime;
+    private PseudoTimer timer;
+
+    /**
+     * Sets up objects, including {@link #timer}.
+     */
+    @Before
+    public void setUp() {
+        count = 0;
+        currentTime = new TestTimeMulti();
+        timer = new PseudoTimer(currentTime);
+    }
+
+    @Test
+    public void testCancel() {
+        // schedule two tasks
+        timer.scheduleAtFixedRate(new MyTask(), DELAY_MS, PERIOD_MS);
+        timer.schedule(new MyTask(), DELAY_MS);
+
+        assertFalse(currentTime.isEmpty());
+
+        // cancel the timer
+        timer.cancel();
+
+        // invoke it again to ensure no exception
+        timer.cancel();
+    }
+
+    @Test
+    public void testPurge() {
+        assertEquals(0, timer.purge());
+        assertEquals(0, timer.purge());
+    }
+
+    @Test
+    public void testScheduleTimerTaskLong() throws InterruptedException {
+        timer.schedule(new MyTask(), DELAY_MS);
+        assertFalse(currentTime.isEmpty());
+
+        // wait for the initial delay
+        currentTime.waitFor(DELAY_MS);
+        assertEquals(1, count);
+
+        assertTrue(currentTime.isEmpty());
+    }
+
+    @Test
+    public void testScheduleTimerTaskDate() {
+        assertThatThrownBy(() -> timer.schedule(new MyTask(), new Date()))
+                        .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Test
+    public void testScheduleTimerTaskLongLong() throws InterruptedException {
+        timer.schedule(new MyTask(), DELAY_MS, PERIOD_MS);
+        assertFalse(currentTime.isEmpty());
+
+        // wait for the initial delay plus a couple of additional periods
+        final long tbegin = System.currentTimeMillis();
+        currentTime.waitFor(DELAY_MS + PERIOD_MS * 2);
+        assertTrue(count >= 3);
+
+        assertFalse(currentTime.isEmpty());
+
+        // this thread should not have blocked while waiting
+        assertTrue(System.currentTimeMillis() < tbegin + 2000);
+    }
+
+    @Test
+    public void testScheduleTimerTaskDateLong() {
+        assertThatThrownBy(() -> timer.schedule(new MyTask(), new Date(), 1L))
+                        .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Test
+    public void testScheduleAtFixedRateTimerTaskLongLong() throws InterruptedException {
+        timer.scheduleAtFixedRate(new MyTask(), DELAY_MS, PERIOD_MS);
+        assertFalse(currentTime.isEmpty());
+
+        // wait for the initial delay plus a couple of additional periods
+        final long tbegin = System.currentTimeMillis();
+        currentTime.waitFor(DELAY_MS + PERIOD_MS * 2);
+        assertTrue(count >= 3);
+
+        assertFalse(currentTime.isEmpty());
+
+        // this thread should not have blocked while waiting
+        assertTrue(System.currentTimeMillis() < tbegin + 2000);
+    }
+
+    @Test
+    public void testScheduleAtFixedRateTimerTaskDateLong() {
+        assertThatThrownBy(() -> timer.scheduleAtFixedRate(new MyTask(), new Date(), 1L))
+                        .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    private class MyTask extends TimerTask {
+        @Override
+        public void run() {
+            ++count;
+        }
+
+    }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java
new file mode 100644 (file)
index 0000000..e7bbd01
--- /dev/null
@@ -0,0 +1,102 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.FutureTask;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RunnableItemTest {
+    private static final long DELAY_MS = 100L;
+    private static final Object ASSOCIATE = new Object();
+
+    private TestTime currentTime;
+    private int count;
+    private RunnableItem item;
+
+    /**
+     * Sets up objects, including {@link #item}.
+     */
+    @Before
+    public void setUp() {
+        currentTime = new TestTime();
+        count = 0;
+        item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, () -> count++);
+    }
+
+    @Test
+    public void testWasCancelled() {
+        assertFalse(item.wasCancelled());
+
+        FutureTask<Object> future = new FutureTask<>(() -> count++);
+        item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, future);
+        assertFalse(item.wasCancelled());
+
+        future.cancel(true);
+        assertTrue(item.wasCancelled());
+    }
+
+    @Test
+    public void testIsAssociatedWith() {
+        assertFalse(item.isAssociatedWith(this));
+        assertTrue(item.isAssociatedWith(ASSOCIATE));
+    }
+
+    @Test
+    public void testFire() {
+        item.fire();
+        assertEquals(1, count);
+
+        // verify that fire() works even if the action throws an exception
+        new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, () -> {
+            throw new RuntimeException("expected exception");
+        }).fire();
+    }
+
+    @Test
+    public void testRunnableItem_testGetAssociate_testGetAction() {
+        assertSame(ASSOCIATE, item.getAssociate());
+        assertNotNull(item.getAction());
+        assertEquals(currentTime.getMillis() + DELAY_MS, item.getNextMs());
+
+        item.getAction().run();
+        assertEquals(1, count);
+
+        // verify that work item is set when constructed with a future
+        PseudoScheduledFuture<Integer> schedFuture = new PseudoScheduledFuture<>(() -> count + 1, false);
+        item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, schedFuture);
+        assertSame(item, schedFuture.getWorkItem());
+
+        // verify that work item is NOT set when constructed with a plain future
+        item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, new FutureTask<>(() -> count + 1));
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(item.toString());
+    }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java
new file mode 100644 (file)
index 0000000..dbd5478
--- /dev/null
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SleepItemTest {
+    private static final int SLEEP_MS = 250;
+    private static final long MAX_WAIT_MS = 5000L;
+
+    private TestTime currentTime;
+    private Thread thread;
+    private CountDownLatch started;
+    private CountDownLatch finished;
+    private volatile InterruptedException threadEx;
+    private SleepItem item;
+
+    /**
+     * Sets up objects, including {@link #item}.
+     */
+    @Before
+    public void setUp() {
+        currentTime = new TestTime();
+        started = new CountDownLatch(1);
+        finished = new CountDownLatch(1);
+
+        thread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    started.countDown();
+                    item.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    threadEx = e;
+                }
+
+                finished.countDown();
+            }
+        };
+        thread.setDaemon(true);
+
+        item = new SleepItem(currentTime, SLEEP_MS, thread);
+    }
+
+    @Test
+    public void testInterrupt() throws InterruptedException {
+        startThread();
+
+        item.interrupt();
+
+        assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS));
+        assertNotNull(threadEx);
+    }
+
+    @Test
+    public void testFire_testAwait() throws InterruptedException {
+        startThread();
+
+        // verify that it hasn't finished yet
+        thread.join(250);
+        assertTrue(finished.getCount() > 0);
+
+        // now fire it and verify that it finishes
+        item.fire();
+        assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS));
+
+        assertNull(threadEx);
+    }
+
+    @Test
+    public void testSleepItem() {
+        assertEquals(currentTime.getMillis() + SLEEP_MS, item.getNextMs());
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(item.toString());
+    }
+
+
+    private void startThread() throws InterruptedException {
+        thread.start();
+        started.await();
+    }
+}
index f17235a..8b6501a 100644 (file)
@@ -1,6 +1,6 @@
-/*
+/*-
  * ============LICENSE_START=======================================================
- * Common Utils-Test
+ * ONAP
  * ================================================================================
  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
 
 package org.onap.policy.common.utils.time;
 
-import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Before;
 import org.junit.Test;
 
-/**
- * Class to test TestTimeMulti.
- */
 public class TestTimeMultiTest {
+    private static final long SHORT_WAIT_MS = 100L;
+    private static final long DELAY_MS = 500L;
+    private static final long MAX_WAIT_MS = 5000L;
 
-    private static final int NTHREADS = 10;
-    private static final int NTIMES = 100;
-    private static final long WAIT_SEC = 5L;
-    private static final long MIN_SLEEP_MS = 5L;
+    private TestTimeMulti multi;
 
-    private TestTimeMulti ttm;
-    private Semaphore done;
+    @Before
+    public void setUp() {
+        multi = new TestTimeMulti();
+    }
 
     @Test
-    public void test() throws Exception {
-        ttm = new TestTimeMulti(NTHREADS);
-        done = new Semaphore(0);
+    public void testSleep() throws InterruptedException {
+        // negative sleep time
+        final long tbegin = multi.getMillis();
+        MyThread thread = new MyThread(-5);
+        thread.start();
 
-        final long tbeg = ttm.getMillis();
+        // should complete without creating a work item
+        assertTrue(thread.await());
+        assertNull(thread.ex);
 
-        // create threads
-        List<MyThread> threads = new ArrayList<>(NTHREADS);
-        for (int x = 0; x < NTHREADS; ++x) {
-            threads.add(new MyThread(x + MIN_SLEEP_MS));
-        }
+        // time should not have changed
+        assertEquals(tbegin, multi.getMillis());
+
+
+        // positive sleep time
+        thread = new MyThread(DELAY_MS);
+        thread.start();
+
+        // must execute the SleepItem
+        multi.runOneTask(MAX_WAIT_MS);
+
+        assertTrue(multi.isEmpty());
+        assertTrue(thread.await());
+        assertNull(thread.ex);
+
+        // time SHOULD HAVE changed
+        assertEquals(tbegin + DELAY_MS, multi.getMillis());
+    }
+
+    @Test
+    public void testTestTimeMulti() {
+        assertTrue(multi.getMaxWaitMs() > 0);
+    }
+
+    @Test
+    public void testTestTimeMultiLong() {
+        assertEquals(200, new TestTimeMulti(200).getMaxWaitMs());
+    }
+
+    @Test
+    public void testIsEmpty_testQueueLength() throws InterruptedException {
+        assertTrue(multi.isEmpty());
+
+        // queue up two items
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+        assertFalse(multi.isEmpty());
+        assertEquals(1, multi.queueLength());
+
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+        assertEquals(2, multi.queueLength());
+
+        // run one - should not be empty yet
+        multi.runOneTask(0);
+        assertFalse(multi.isEmpty());
+        assertEquals(1, multi.queueLength());
+
+        // run the other - should be empty now
+        multi.runOneTask(0);
+        assertTrue(multi.isEmpty());
+        assertEquals(0, multi.queueLength());
+    }
+
+    @Test
+    public void testDestroy() throws InterruptedException {
+        // this won't interrupt
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+        // these will interrupt
+        AtomicBoolean interrupted1 = new AtomicBoolean(false);
+        multi.enqueue(new WorkItem(multi, DELAY_MS) {
+            @Override
+            public void interrupt() {
+                interrupted1.set(true);
+            }
+        });
+
+        AtomicBoolean interrupted2 = new AtomicBoolean(false);
+        multi.enqueue(new WorkItem(multi, DELAY_MS) {
+            @Override
+            public void interrupt() {
+                interrupted2.set(true);
+            }
+        });
+
+        multi.destroy();
+        assertTrue(multi.isEmpty());
+
+        assertTrue(interrupted1.get());
+        assertTrue(interrupted2.get());
+    }
+
+    @Test
+    public void testRunOneTask() throws InterruptedException {
+        // nothing in the queue yet
+        assertFalse(multi.runOneTask(0));
+
+        // put something in the queue
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+        final long tbegin = multi.getMillis();
+        assertTrue(multi.runOneTask(MAX_WAIT_MS));
+
+        assertEquals(tbegin + DELAY_MS, multi.getMillis());
+
+        // nothing in the queue now
+        assertFalse(multi.runOneTask(0));
+
+        // time doesn't change
+        assertEquals(tbegin + DELAY_MS, multi.getMillis());
+    }
+
+    @Test
+    public void testWaitFor() throws InterruptedException {
+        // queue up a couple of items
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+        final long realBegin = System.currentTimeMillis();
+        final long tbegin = multi.getMillis();
+        multi.waitFor(DELAY_MS * 2 - 1);
+        assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+        // minimal real time should have elapsed
+        assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS);
+    }
+
+    @Test
+    public void testWaitFor_EmptyQueue() throws InterruptedException {
+        multi = new TestTimeMulti(SHORT_WAIT_MS);
+
+        final long realBegin = System.currentTimeMillis();
+        final long tbegin = multi.getMillis();
+
+        multi.waitFor(2);
+
+        assertEquals(tbegin + 2, multi.getMillis());
+        assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS);
+    }
+
+    @Test
+    public void testWaitUntilCallable() throws InterruptedException {
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+        final long tbegin = multi.getMillis();
+        AtomicInteger count = new AtomicInteger(0);
+        multi.waitUntil(() -> count.incrementAndGet() == 3);
+
+        assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+        // should still be one item left in the queue
+        assertEquals(1, multi.queueLength());
+        assertEquals(3, count.get());
+    }
+
+    @Test
+    public void testWaitUntilCallable_InterruptEx() throws InterruptedException {
+        multi = new TestTimeMulti();
+
+        Callable<Boolean> callable = () -> {
+            throw new InterruptedException("expected exception");
+        };
+
+        LinkedBlockingQueue<Error> errors = new LinkedBlockingQueue<>();
+
+        Thread thread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    multi.waitUntil(callable);
+                } catch (Error ex) {
+                    errors.add(ex);
+                }
+            }
+        };
+
+        thread.start();
+
+        Error ex = errors.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+        assertNotNull(ex);
+        assertEquals("interrupted while waiting for condition: expected exception", ex.getMessage());
+    }
+
+    @Test
+    public void testWaitUntilCallable_ConditionThrowsEx() throws InterruptedException {
+        multi = new TestTimeMulti();
+
+        Callable<Boolean> callable = () -> {
+            throw new IllegalStateException("expected exception");
+        };
+
+        final long realBegin = System.currentTimeMillis();
+        assertThatThrownBy(() -> multi.waitUntil(callable))
+                        .hasMessage("condition evaluator threw an exception: expected exception");
+
+        assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS);
+    }
+
+    @Test
+    public void testWaitUntilCallable_NeverSatisfied() throws InterruptedException {
+        multi = new TestTimeMulti(SHORT_WAIT_MS);
+
+        final long realBegin = System.currentTimeMillis();
+        assertThatThrownBy(() -> multi.waitUntil(() -> false))
+                        .hasMessage(TestTimeMulti.NEVER_SATISFIED);
+        assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS);
+    }
 
-        // launch threads
-        for (MyThread thr : threads) {
-            thr.start();
+    @Test
+    public void testWaitUntilLongTimeUnitCallable() throws InterruptedException {
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+        final long tbegin = multi.getMillis();
+        AtomicInteger count = new AtomicInteger(0);
+        multi.waitUntil(DELAY_MS * 4, TimeUnit.MILLISECONDS, () -> count.incrementAndGet() == 3);
+
+        assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+        // should still be one item left in the queue
+        assertEquals(1, multi.queueLength());
+        assertEquals(3, count.get());
+    }
+
+    @Test
+    public void testWaitUntilLongTimeUnitCallable_PseudoTimeExpires() throws InterruptedException {
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+        multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+        final long tbegin = multi.getMillis();
+        assertThatThrownBy(() -> multi.waitUntil(DELAY_MS * 2 - 1, TimeUnit.MILLISECONDS, () -> false))
+                        .hasMessage(TestTimeMulti.NEVER_SATISFIED);
+        assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+    }
+
+    @Test
+    public void testRunItem() throws InterruptedException {
+        AtomicBoolean fired = new AtomicBoolean(false);
+        multi.enqueue(new MyWorkItem(fired));
+
+        assertTrue(multi.runOneTask(1));
+
+        // should no longer be in the queue
+        assertTrue(multi.isEmpty());
+
+        // should have been fired
+        assertTrue(fired.get());
+    }
+
+    @Test
+    public void testRunItem_Rescheduled() throws InterruptedException {
+        AtomicBoolean fired = new AtomicBoolean(false);
+
+        multi.enqueue(new MyWorkItem(fired) {
+            @Override
+            public boolean bumpNextTime() {
+                bumpNextTime(DELAY_MS);
+                return true;
+            }
+        });
+
+        assertTrue(multi.runOneTask(1));
+
+        // should still be in the queue
+        assertEquals(1, multi.queueLength());
+
+        // should have been fired
+        assertTrue(fired.get());
+    }
+
+    @Test
+    public void testRunItem_Canceled() throws InterruptedException {
+        AtomicBoolean fired = new AtomicBoolean(false);
+
+        multi.enqueue(new MyWorkItem(fired) {
+            @Override
+            public boolean wasCancelled() {
+                return true;
+            }
+
+            @Override
+            public boolean bumpNextTime() {
+                return true;
+            }
+        });
+
+        final long tbegin = multi.getMillis();
+        assertTrue(multi.runOneTask(1));
+
+        // time should be unchanged
+        assertEquals(tbegin, multi.getMillis());
+
+        assertTrue(multi.isEmpty());
+
+        // should not have been fired
+        assertFalse(fired.get());
+    }
+
+    @Test
+    public void testEnqueue() throws InterruptedException {
+        CountDownLatch started = new CountDownLatch(1);
+        CountDownLatch finished = new CountDownLatch(1);
+        AtomicReference<InterruptedException> ex = new AtomicReference<>();
+
+        Thread thread = new Thread() {
+            @Override
+            public void run() {
+                started.countDown();
+
+                try {
+                    multi.runOneTask(DELAY_MS * 3);
+                } catch (InterruptedException e) {
+                    ex.set(e);
+                }
+
+                finished.countDown();
+            }
+        };
+
+        thread.start();
+
+        // wait for thread to start
+        started.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+        // wait for it to block on the lock
+        await().atMost(MAX_WAIT_MS, TimeUnit.MILLISECONDS).until(() -> thread.getState() == Thread.State.TIMED_WAITING);
+
+        // add an item to the queue - should trigger the thread to continue
+        multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+        assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS));
+        assertNull(ex.get());
+    }
+
+    @Test
+    public void testCancelItems() throws InterruptedException {
+        AtomicBoolean fired1 = new AtomicBoolean();
+        multi.enqueue(new MyWorkItem(fired1));
+
+        AtomicBoolean fired2 = new AtomicBoolean();
+        multi.enqueue(new MyWorkItem(fired2));
+        multi.enqueue(new MyWorkItem(fired2));
+
+        AtomicBoolean fired3 = new AtomicBoolean();
+        multi.enqueue(new MyWorkItem(fired3));
+
+        // cancel some
+        multi.cancelItems(fired2);
+
+        // should have only canceled two of them
+        assertEquals(2, multi.queueLength());
+
+        // fire both
+        multi.runOneTask(0);
+        multi.runOneTask(0);
+
+        // these should have fired
+        assertTrue(fired1.get());
+        assertTrue(fired3.get());
+
+        // these should NOT have fired
+        assertFalse(fired2.get());
+    }
+
+    @Test
+    public void testPurgeItems() throws InterruptedException {
+        AtomicBoolean fired = new AtomicBoolean();
+
+        // queue up two that are canceled, one that is not
+        multi.enqueue(new MyWorkItem(true));
+        multi.enqueue(new MyWorkItem(fired));
+        multi.enqueue(new MyWorkItem(true));
+
+        multi.purgeItems();
+
+        assertEquals(1, multi.queueLength());
+
+        multi.runOneTask(0);
+        assertTrue(fired.get());
+    }
+
+    private class MyWorkItem extends WorkItem {
+        private final AtomicBoolean fired;
+        private final boolean canceled;
+
+        public MyWorkItem(AtomicBoolean fired) {
+            super(multi, DELAY_MS);
+            this.fired = fired;
+            this.canceled = false;
         }
 
-        // wait for each one to complete
-        for (MyThread thr : threads) {
-            assertTrue("complete " + thr.getSleepMs(), done.tryAcquire(WAIT_SEC, TimeUnit.SECONDS));
-            ttm.threadCompleted();
+        public MyWorkItem(boolean canceled) {
+            super(multi, DELAY_MS);
+            this.fired = new AtomicBoolean();
+            this.canceled = canceled;
         }
 
-        // check results
-        for (MyThread thr : threads) {
-            assertEquals("time " + thr.getSleepMs(), thr.texpected, thr.tactual);
+        @Override
+        public void fire() {
+            fired.set(true);
         }
 
-        assertTrue(ttm.getMillis() >= tbeg + NTIMES * MIN_SLEEP_MS);
+        @Override
+        public boolean isAssociatedWith(Object associate) {
+            return (fired == associate);
+        }
 
-        // something in the queue, but no threads remain -> exception
-        assertThatIllegalStateException().isThrownBy(() -> ttm.threadCompleted());
+        @Override
+        public boolean wasCancelled() {
+            return canceled;
+        }
     }
 
     private class MyThread extends Thread {
-
         private final long sleepMs;
-
-        private volatile long texpected;
-        private volatile long tactual;
+        private final CountDownLatch finished = new CountDownLatch(1);
+        private InterruptedException ex = null;
 
         public MyThread(long sleepMs) {
             this.sleepMs = sleepMs;
-
             this.setDaemon(true);
         }
 
-        public long getSleepMs() {
-            return sleepMs;
+        public boolean await() throws InterruptedException {
+            return finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
         }
 
         @Override
         public void run() {
             try {
-                for (int x = 0; x < NTIMES; ++x) {
-                    // negative sleep should have no effect
-                    texpected = ttm.getMillis();
-                    ttm.sleep(-1);
-                    if ((tactual = ttm.getMillis()) != texpected) {
-                        break;
-                    }
-
-                    texpected = ttm.getMillis() + sleepMs;
-                    ttm.sleep(sleepMs);
-
-                    if ((tactual = ttm.getMillis()) != texpected) {
-                        break;
-                    }
-
-                    if ((tactual = ttm.getDate().getTime()) != texpected) {
-                        break;
-                    }
-                }
-
-            } catch (InterruptedException expected) {
+                multi.sleep(sleepMs);
+            } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
+                ex = e;
             }
 
-            done.release();
+            finished.countDown();
         }
     }
 }
index 3e7897e..d2cf678 100644 (file)
@@ -2,14 +2,14 @@
  * ============LICENSE_START=======================================================
  * Common Utils-Test
  * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -62,6 +62,11 @@ public class TestTimeTest {
 
         // ensure that no real time has elapsed
         assertTrue(System.currentTimeMillis() < treal + tsleep / 2);
+
+        // negative sleep should not modify the time
+        tcur = tm.getMillis();
+        tm.sleep(-1);
+        assertEquals(tcur, tm.getMillis());
     }
 
 }
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java
new file mode 100644 (file)
index 0000000..4e6f92b
--- /dev/null
@@ -0,0 +1,100 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class WorkItemTest {
+    private TestTime currentTime;
+    private WorkItem item;
+
+    @Before
+    public void setUp() {
+        currentTime = new TestTime();
+        item = new WorkItem(currentTime, 0);
+    }
+
+    @Test
+    public void testWorkItem() {
+        assertThatIllegalArgumentException().isThrownBy(() -> new WorkItem(currentTime, -1));
+
+        // should not throw an exception
+        new WorkItem(currentTime, 1);
+    }
+
+    @Test
+    public void testGetDelay() {
+        assertEquals(1, item.getDelay());
+    }
+
+    @Test
+    public void testWasCancelled() {
+        assertFalse(item.wasCancelled());
+    }
+
+    @Test
+    public void testBumpNextTime() {
+        assertFalse(item.bumpNextTime());
+    }
+
+    @Test
+    public void testBumpNextTimeLong() {
+        assertThatIllegalArgumentException().isThrownBy(() -> item.bumpNextTime(-1));
+
+        long cur = currentTime.getMillis();
+        item.bumpNextTime(5);
+        assertEquals(cur + 5, item.getNextMs());
+
+        item.bumpNextTime(0);
+
+        // should bump the time by at least 1
+        assertEquals(cur + 1, item.getNextMs());
+    }
+
+    @Test
+    public void testInterrupt() {
+        item.interrupt();
+        assertFalse(Thread.interrupted());
+    }
+
+    @Test
+    public void testIsAssociatedWith() {
+        assertFalse(item.isAssociatedWith(this));
+    }
+
+    @Test
+    public void testFire() {
+        // ensure no exception is thrown
+        item.fire();
+    }
+
+    @Test
+    public void testGetNextMs() {
+        assertEquals(currentTime.getMillis() + 1, item.getNextMs());
+        assertEquals(currentTime.getMillis() + 10, new WorkItem(currentTime, 10).getNextMs());
+    }
+
+}