2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.pap.main.comm;
23 import java.util.LinkedHashMap;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Semaphore;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Manager of timers. All of the timers for a given manager have the same wait time, which
34 * makes it possible to use a linked hash map to track the timers. As a result, timers can
35 * be quickly added and removed. In addition, the expiration time of any new timer is
36 * always greater than or equal to the timers that are already in the map. Consequently,
37 * the map's iterator will go in ascending order from the minimum expiration time to
38 * maximum expiration time.
40 * <p>This class has not been tested for multiple threads invoking {@link #run()}
43 public class TimerManager implements Runnable {
44 private static final Logger logger = LoggerFactory.getLogger(TimerManager.class);
47 * Name of this manager, used for logging purposes.
49 private final String name;
52 * Time that each new timer should wait.
54 private final long waitTimeMs;
57 * When the map is empty, the timer thread will block waiting for this semaphore. When
58 * a new timer is added to the map, the semaphore will be released, thus allowing the
59 * timer thread to progress.
61 private final Semaphore sem = new Semaphore(0);
64 * This is decremented to indicate that this manager should be stopped.
66 private final CountDownLatch stopper = new CountDownLatch(1);
69 * Used to lock updates to the map.
71 private final Object lockit = new Object();
74 * Maps a timer name to a timer.
76 private final Map<String, Timer> name2timer = new LinkedHashMap<>();
79 * Constructs the object.
81 * @param name name of this manager, used for logging purposes
82 * @param waitTimeMs time that each new timer should wait
84 public TimerManager(String name, long waitTimeMs) {
86 this.waitTimeMs = waitTimeMs;
90 * Stops the timer thread.
93 logger.info("timer manager {} stopping", name);
95 // Note: Must decrement the latch BEFORE releasing the semaphore
101 * Registers a timer with the given name. When the timer expires, it is automatically
102 * unregistered and then executed.
104 * @param timerName name of the timer to register
105 * @param action action to take when the timer expires; the "timerName" is passed as
109 public Timer register(String timerName, Consumer<String> action) {
111 synchronized (lockit) {
112 // always remove existing entry so that new entry goes at the end of the map
113 var timer = name2timer.remove(timerName);
115 logger.info("{} timer replaced {}", name, timer);
118 timer = new Timer(timerName, action);
119 name2timer.put(timerName, timer);
121 logger.info("{} timer registered {}", name, timer);
123 // release the timer thread in case it's waiting
131 * Continuously processes timers until {@link #stop()} is invoked.
135 logger.info("timer manager {} started", name);
137 while (stopper.getCount() > 0) {
145 } catch (InterruptedException e) {
146 logger.warn("timer manager {} stopping due to interrupt", name);
148 Thread.currentThread().interrupt();
152 logger.info("timer manager {} stopped", name);
156 * Process all timers, continuously, as long as a timer remains in the map (and
157 * {@link #stop()} has not been called).
159 * @throws InterruptedException if the thread is interrupted
161 private void processTimers() throws InterruptedException {
163 while ((timer = getNextTimer()) != null && stopper.getCount() > 0) {
169 * Gets the timer that will expire first.
171 * @return the timer that will expire first, or {@code null} if there are no timers
173 private Timer getNextTimer() {
175 synchronized (lockit) {
176 if (name2timer.isEmpty()) {
180 // use an iterator to get the first timer in the map
181 return name2timer.values().iterator().next();
186 * Process a timer, waiting until it expires, unregistering it, and then executing its
189 * @param timer timer to process
190 * @throws InterruptedException if the thread is interrupted
192 private void processTimer(Timer timer) throws InterruptedException {
195 if (stopper.getCount() == 0) {
200 if (!timer.cancel("expired")) {
201 // timer was cancelled while we were waiting
208 logger.info("{} timer firing {}", TimerManager.this.name, timer);
209 timer.runner.accept(timer.name);
210 } catch (RuntimeException e) {
211 logger.warn("{} timer threw an exception {}", TimerManager.this.name, timer, e);
225 * Time, in milliseconds, when the timer will expire.
227 private long expireMs;
230 * Action to take when the timer expires.
232 private Consumer<String> runner;
235 private Timer(String name, Consumer<String> runner2) {
237 this.expireMs = waitTimeMs + currentTimeMillis();
238 this.runner = runner2;
241 private void await() throws InterruptedException {
242 // wait for it to expire, if necessary
243 long tleft = expireMs - currentTimeMillis();
245 logger.info("{} timer waiting {}ms {}", TimerManager.this.name, tleft, this);
253 * @return {@code true} if the timer was cancelled, {@code false} if the timer was
256 public boolean cancel() {
257 return cancel("cancelled");
263 * @param cancelMsg message to log if the timer is successfully
265 * @return {@code true} if the timer was cancelled, {@code false} if the timer was
268 private boolean cancel(String cancelMsg) {
270 synchronized (lockit) {
271 if (!name2timer.remove(name, this)) {
272 // have a new timer in the map - ignore "this" timer
273 logger.info("{} timer discarded ({}) {}", TimerManager.this.name, cancelMsg, this);
277 logger.info("{} timer {} {}", TimerManager.this.name, cancelMsg, this);
283 public String toString() {
284 return "Timer [name=" + name + ", expireMs=" + expireMs + "]";
288 // these may be overridden by junit tests
291 * Gets the current time, in milli-seconds.
293 * @return the current time, in milli-seconds
295 protected long currentTimeMillis() {
296 return System.currentTimeMillis();
300 * "Sleeps" for a bit, stopping if {@link #stop()} is invoked.
302 * @param timeMs time, in milli-seconds, to sleep
303 * @throws InterruptedException if this thread is interrupted while sleeping
305 protected void sleep(long timeMs) throws InterruptedException {
306 if (stopper.await(timeMs, TimeUnit.MILLISECONDS)) {
307 logger.info("sleep finishing due to stop()");