2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.ResultSet;
26 import java.sql.SQLException;
27 import java.sql.SQLTransientException;
28 import java.util.HashSet;
30 import java.util.Properties;
32 import java.util.UUID;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import lombok.AccessLevel;
42 import org.apache.commons.dbcp2.BasicDataSource;
43 import org.apache.commons.dbcp2.BasicDataSourceFactory;
44 import org.onap.policy.common.utils.network.NetworkUtil;
45 import org.onap.policy.drools.core.lock.AlwaysFailLock;
46 import org.onap.policy.drools.core.lock.Lock;
47 import org.onap.policy.drools.core.lock.LockCallback;
48 import org.onap.policy.drools.core.lock.LockImpl;
49 import org.onap.policy.drools.core.lock.LockState;
50 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
51 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
52 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
53 import org.onap.policy.drools.system.PolicyEngine;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
59 * Distributed implementation of the Lock Feature. Maintains locks across servers using a
63 * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
69 * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
70 * instead populated with the {@link #uuidString}.</li>
71 * <li>A periodic check of the DB is made to determine if any of the locks have
73 * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
74 * will be added to the map once free() or extend() is invoked, provided there isn't
75 * already an entry. In addition, it initially has the host and UUID of the feature
76 * instance that created it. However, as soon as doExtend() completes successfully, the
77 * host and UUID of the lock will be updated to reflect the values within this feature
81 public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi {
83 private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
85 private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
86 private static final String LOCK_LOST_MSG = "lock lost";
87 private static final String NOT_LOCKED_MSG = "not locked";
89 @Getter(AccessLevel.PROTECTED)
90 @Setter(AccessLevel.PROTECTED)
91 private static DistributedLockManager latestInstance = null;
95 * Name of the host on which this JVM is running.
98 private final String hostName;
101 * UUID of this object.
104 private final String uuidString = UUID.randomUUID().toString();
107 * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
108 * lock is added to the map, it remains in the map until the lock is lost or until the
109 * unlock request completes.
111 private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
114 * Engine with which this manager is associated.
116 private PolicyEngine engine;
119 * Feature properties.
121 private DistributedLockProperties featProps;
124 * Thread pool used to check for lock expiration and to notify owners when locks are
127 private ScheduledExecutorService exsvc = null;
130 * Data source used to connect to the DB.
132 private BasicDataSource dataSource = null;
136 * Constructs the object.
138 public DistributedLockManager() {
139 this.hostName = NetworkUtil.getHostname();
143 public int getSequenceNumber() {
148 public boolean isAlive() {
149 return (exsvc != null);
153 public boolean start() {
154 // handled via engine API
159 public boolean stop() {
160 // handled via engine API
165 public void shutdown() {
166 // handled via engine API
170 public boolean isLocked() {
175 public boolean lock() {
180 public boolean unlock() {
185 public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
188 this.engine = engine;
189 this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
190 this.exsvc = getThreadPool();
191 this.dataSource = makeDataSource();
195 } catch (Exception e) {
196 throw new DistributedLockManagerException(e);
201 public boolean afterStart(PolicyEngine engine) {
204 exsvc.execute(this::deleteExpiredDbLocks);
205 exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
207 setLatestInstance(this);
209 } catch (Exception e) {
210 throw new DistributedLockManagerException(e);
219 * @return a new, pooled data source
220 * @throws Exception exception
222 protected BasicDataSource makeDataSource() throws Exception {
223 Properties props = new Properties();
224 props.put("driverClassName", featProps.getDbDriver());
225 props.put("url", featProps.getDbUrl());
226 props.put("username", featProps.getDbUser());
227 props.put("password", featProps.getDbPwd());
228 props.put("testOnBorrow", "true");
229 props.put("poolPreparedStatements", "true");
231 // additional properties are listed in the GenericObjectPool API
233 return BasicDataSourceFactory.createDataSource(props);
237 * Deletes expired locks from the DB.
239 private void deleteExpiredDbLocks() {
240 logger.info("deleting all expired locks from the DB");
242 try (Connection conn = dataSource.getConnection();
243 PreparedStatement stmt = conn
244 .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
246 int ndel = stmt.executeUpdate();
247 logger.info("deleted {} expired locks from the DB", ndel);
249 } catch (SQLException e) {
250 logger.warn("failed to delete expired locks from the DB", e);
255 * Closes the data source. Does <i>not</i> invoke any lock call-backs.
258 public boolean afterStop(PolicyEngine engine) {
265 * Closes {@link #dataSource} and sets it to {@code null}.
267 private void closeDataSource() {
269 if (dataSource != null) {
273 } catch (SQLException e) {
274 logger.error("cannot close the distributed locking DB", e);
281 public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
282 boolean waitForLock) {
284 if (latestInstance != this) {
285 AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
286 lock.notifyUnavailable();
290 DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
292 DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
294 // do these outside of compute() to avoid blocking other map operations
295 if (existingLock == null) {
296 logger.debug("added lock to map {}", lock);
297 lock.scheduleRequest(lock::doLock);
299 lock.deny("resource is busy", true);
306 * Checks for expired locks.
308 private void checkExpired() {
311 logger.info("checking for expired locks");
312 Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
313 identifyDbLocks(expiredIds);
314 expireLocks(expiredIds);
316 exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
318 } catch (RejectedExecutionException e) {
319 logger.warn("thread pool is no longer accepting requests", e);
321 } catch (SQLException | RuntimeException e) {
322 logger.error("error checking expired locks", e);
323 exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
326 logger.info("done checking for expired locks");
330 * Identifies this feature instance's locks that the DB indicates are still active.
332 * @param expiredIds IDs of resources that have expired locks. If a resource is still
333 * locked, it's ID is removed from this set
334 * @throws SQLException if a DB error occurs
336 private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
338 * We could query for host and UUIDs that actually appear within the locks, but
339 * those might change while the query is running so no real value in doing that.
340 * On the other hand, there's only a brief instance between the time a
341 * deserialized lock is added to this feature instance and its doExtend() method
342 * updates its host and UUID to match this feature instance. If this happens to
343 * run during that brief instance, then the lock will be lost and the callback
344 * invoked. It isn't worth complicating this code further to handle those highly
349 try (Connection conn = dataSource.getConnection();
350 PreparedStatement stmt = conn.prepareStatement(
351 "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
354 stmt.setString(1, hostName);
355 stmt.setString(2, uuidString);
357 try (ResultSet resultSet = stmt.executeQuery()) {
358 while (resultSet.next()) {
359 String resourceId = resultSet.getString(1);
361 // we have now seen this resource id
362 expiredIds.remove(resourceId);
369 * Expires locks for the resources that no longer appear within the DB.
371 * @param expiredIds IDs of resources that have expired locks
373 private void expireLocks(Set<String> expiredIds) {
374 for (String resourceId : expiredIds) {
375 AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
377 resource2lock.computeIfPresent(resourceId, (key, lock) -> {
378 if (lock.isActive()) {
379 // it thinks it's active, but it isn't - remove from the map
387 DistributedLock lock = lockref.get();
389 logger.debug("removed lock from map {}", lock);
390 lock.deny(LOCK_LOST_MSG, false);
396 * Distributed Lock implementation.
398 public static class DistributedLock extends LockImpl {
399 private static final String SQL_FAILED_MSG = "request failed for lock: {}";
401 private static final long serialVersionUID = 1L;
404 * Feature containing this lock. May be {@code null} until the feature is
405 * identified. Note: this can only be null if the lock has been de-serialized.
407 private transient DistributedLockManager feature;
410 * Host name from the feature instance that created this object. Replaced with the
411 * host name from the current feature instance whenever the lock is successfully
414 private String hostName;
417 * UUID string from the feature instance that created this object. Replaced with
418 * the UUID string from the current feature instance whenever the lock is
419 * successfully extended.
421 private String uuidString;
424 * {@code True} if the lock is busy making a request, {@code false} otherwise.
426 private transient boolean busy = false;
429 * Request to be performed.
431 private transient RunnableWithEx request = null;
434 * Number of times we've retried a request.
436 private transient int nretries = 0;
439 * Constructs the object.
441 public DistributedLock() {
443 this.uuidString = "";
447 * Constructs the object.
449 * @param state initial state of the lock
450 * @param resourceId identifier of the resource to be locked
451 * @param ownerKey information identifying the owner requesting the lock
452 * @param holdSec amount of time, in seconds, for which the lock should be held,
453 * after which it will automatically be released
454 * @param callback callback to be invoked once the lock is granted, or
455 * subsequently lost; must not be {@code null}
456 * @param feature feature containing this lock
458 public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
459 DistributedLockManager feature) {
460 super(state, resourceId, ownerKey, holdSec, callback);
462 this.feature = feature;
463 this.hostName = feature.hostName;
464 this.uuidString = feature.uuidString;
468 * Grants this lock. The notification is <i>always</i> invoked via the
469 * <i>foreground</i> thread.
471 protected void grant() {
472 synchronized (this) {
473 if (isUnavailable()) {
477 setState(LockState.ACTIVE);
480 logger.info("lock granted: {}", this);
486 * Permanently denies this lock.
488 * @param reason the reason the lock was denied
489 * @param foreground {@code true} if the callback can be invoked in the current
490 * (i.e., foreground) thread, {@code false} if it should be invoked via the
493 protected void deny(String reason, boolean foreground) {
494 synchronized (this) {
495 setState(LockState.UNAVAILABLE);
498 logger.info("{}: {}", reason, this);
500 if (feature == null || foreground) {
504 feature.exsvc.execute(this::notifyUnavailable);
509 public boolean free() {
510 // do a quick check of the state
511 if (isUnavailable()) {
515 logger.info("releasing lock: {}", this);
517 if (!attachFeature()) {
518 setState(LockState.UNAVAILABLE);
522 AtomicBoolean result = new AtomicBoolean(false);
524 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
525 if (curlock == this && !isUnavailable()) {
526 // this lock was the owner
528 setState(LockState.UNAVAILABLE);
531 * NOTE: do NOT return null; curlock must remain until doUnlock
540 scheduleRequest(this::doUnlock);
548 public void extend(int holdSec, LockCallback callback) {
550 throw new IllegalArgumentException("holdSec is negative");
554 setCallback(callback);
556 // do a quick check of the state
557 if (isUnavailable() || !attachFeature()) {
558 deny(LOCK_LOST_MSG, true);
562 AtomicBoolean success = new AtomicBoolean(false);
564 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
565 if (curlock == this && !isUnavailable()) {
567 setState(LockState.WAITING);
570 // note: leave it in the map until doUnlock() removes it
576 scheduleRequest(this::doExtend);
579 deny(NOT_LOCKED_MSG, true);
584 * Attaches to the feature instance, if not already attached.
586 * @return {@code true} if the lock is now attached to a feature, {@code false}
589 private synchronized boolean attachFeature() {
590 if (feature != null) {
595 feature = latestInstance;
596 if (feature == null) {
597 logger.warn("no feature yet for {}", this);
601 // put this lock into the map
602 feature.resource2lock.putIfAbsent(getResourceId(), this);
608 * Schedules a request for execution.
610 * @param schedreq the request that should be scheduled
612 private synchronized void scheduleRequest(RunnableWithEx schedreq) {
613 logger.debug("schedule lock action {}", this);
616 feature.exsvc.execute(this::doRequest);
620 * Reschedules a request for execution, if there is not already a request in the
621 * queue, and if the retry count has not been exhausted.
623 * @param req request to be rescheduled
625 private void rescheduleRequest(RunnableWithEx req) {
626 synchronized (this) {
627 if (request != null) {
628 // a new request has already been scheduled - it supersedes "req"
629 logger.debug("not rescheduling lock action {}", this);
633 if (nretries++ < feature.featProps.getMaxRetries()) {
634 logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
636 feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
641 logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
646 * Gets, and removes, the next request from the queue. Clears {@link #busy} if
647 * there are no more requests in the queue.
649 * @param prevReq the previous request that was just run
651 * @return the next request, or {@code null} if the queue is empty
653 private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
654 if (request == null || request == prevReq) {
655 logger.debug("no more requests for {}", this);
660 RunnableWithEx req = request;
667 * Executes the current request, if none are currently executing.
669 private void doRequest() {
670 synchronized (this) {
672 // another thread is already processing the request(s)
679 * There is a race condition wherein this thread could invoke run() while the
680 * next scheduled thread checks the busy flag and finds that work is being
681 * done and returns, leaving the next work item in "request". In that case,
682 * the next work item may never be executed, thus we use a loop here, instead
683 * of just executing a single request.
685 RunnableWithEx req = null;
686 while ((req = getNextRequest(req)) != null) {
687 if (feature.resource2lock.get(getResourceId()) != this) {
689 * no longer in the map - don't apply the action, as it may interfere
690 * with any newly added Lock object
692 logger.debug("discard lock action {}", this);
693 synchronized (this) {
701 * Run the request. If it throws an exception, then it will be
702 * rescheduled for execution a little later.
706 } catch (SQLException e) {
707 logger.warn(SQL_FAILED_MSG, this, e);
709 if (e.getCause() instanceof SQLTransientException) {
710 // retry the request a little later
711 rescheduleRequest(req);
716 } catch (RuntimeException e) {
717 logger.warn(SQL_FAILED_MSG, this, e);
724 * Attempts to add a lock to the DB. Generates a callback, indicating success or
727 * @throws SQLException if a DB error occurs
729 private void doLock() throws SQLException {
731 logger.debug("discard doLock {}", this);
736 * There is a small window in which a client could invoke free() before the DB
737 * is updated. In that case, doUnlock will be added to the queue to run after
738 * this, which will delete the record, as desired. In addition, grant() will
739 * not do anything, because the lock state will have been set to UNAVAILABLE
743 logger.debug("doLock {}", this);
744 try (Connection conn = feature.dataSource.getConnection()) {
745 boolean success = false;
747 success = doDbInsert(conn);
749 } catch (SQLException e) {
750 logger.info("failed to insert lock record - attempting update: {}", this, e);
751 success = doDbUpdate(conn);
764 * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
765 * it fails, as this should only be executed in response to a call to
768 * @throws SQLException if a DB error occurs
770 private void doUnlock() throws SQLException {
771 logger.debug("unlock {}", this);
772 try (Connection conn = feature.dataSource.getConnection()) {
780 * Attempts to extend a lock in the DB. Generates a callback, indicating success
783 * @throws SQLException if a DB error occurs
785 private void doExtend() throws SQLException {
787 logger.debug("discard doExtend {}", this);
792 * There is a small window in which a client could invoke free() before the DB
793 * is updated. In that case, doUnlock will be added to the queue to run after
794 * this, which will delete the record, as desired. In addition, grant() will
795 * not do anything, because the lock state will have been set to UNAVAILABLE
799 logger.debug("doExtend {}", this);
800 try (Connection conn = feature.dataSource.getConnection()) {
802 * invoker may have called extend() before free() had a chance to insert
803 * the record, thus we have to try to insert, if the update fails
805 if (doDbUpdate(conn) || doDbInsert(conn)) {
815 * Inserts the lock into the DB.
817 * @param conn DB connection
818 * @return {@code true} if a record was successfully inserted, {@code false}
820 * @throws SQLException if a DB error occurs
822 protected boolean doDbInsert(Connection conn) throws SQLException {
823 logger.debug("insert lock record {}", this);
824 try (PreparedStatement stmt =
825 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
826 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
828 stmt.setString(1, getResourceId());
829 stmt.setString(2, feature.hostName);
830 stmt.setString(3, feature.uuidString);
831 stmt.setInt(4, getHoldSec());
833 stmt.executeUpdate();
835 this.hostName = feature.hostName;
836 this.uuidString = feature.uuidString;
843 * Updates the lock in the DB.
845 * @param conn DB connection
846 * @return {@code true} if a record was successfully updated, {@code false}
848 * @throws SQLException if a DB error occurs
850 protected boolean doDbUpdate(Connection conn) throws SQLException {
851 logger.debug("update lock record {}", this);
852 try (PreparedStatement stmt =
853 conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
854 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
855 + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
857 stmt.setString(1, getResourceId());
858 stmt.setString(2, feature.hostName);
859 stmt.setString(3, feature.uuidString);
860 stmt.setInt(4, getHoldSec());
862 stmt.setString(5, getResourceId());
863 stmt.setString(6, this.hostName);
864 stmt.setString(7, this.uuidString);
866 if (stmt.executeUpdate() != 1) {
870 this.hostName = feature.hostName;
871 this.uuidString = feature.uuidString;
878 * Deletes the lock from the DB.
880 * @param conn DB connection
881 * @throws SQLException if a DB error occurs
883 protected void doDbDelete(Connection conn) throws SQLException {
884 logger.debug("delete lock record {}", this);
885 try (PreparedStatement stmt =
886 conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
888 stmt.setString(1, getResourceId());
889 stmt.setString(2, this.hostName);
890 stmt.setString(3, this.uuidString);
892 stmt.executeUpdate();
897 * Removes the lock from the map, and sends a notification using the current
900 private void removeFromMap() {
901 logger.debug("remove lock from map {}", this);
902 feature.resource2lock.remove(getResourceId(), this);
904 synchronized (this) {
905 if (!isUnavailable()) {
906 deny(LOCK_LOST_MSG, true);
912 public String toString() {
913 return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
914 + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
920 private static interface RunnableWithEx {
921 void run() throws SQLException;
924 // these may be overridden by junit tests
926 protected Properties getProperties(String fileName) {
927 return SystemPersistenceConstants.getManager().getProperties(fileName);
930 protected ScheduledExecutorService getThreadPool() {
931 return engine.getExecutorService();
934 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
935 LockCallback callback) {
936 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);