2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.ResultSet;
26 import java.sql.SQLException;
27 import java.util.HashSet;
29 import java.util.Properties;
31 import java.util.UUID;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import lombok.AccessLevel;
41 import org.apache.commons.dbcp2.BasicDataSource;
42 import org.apache.commons.dbcp2.BasicDataSourceFactory;
43 import org.onap.policy.common.utils.network.NetworkUtil;
44 import org.onap.policy.drools.core.lock.AlwaysFailLock;
45 import org.onap.policy.drools.core.lock.Lock;
46 import org.onap.policy.drools.core.lock.LockCallback;
47 import org.onap.policy.drools.core.lock.LockImpl;
48 import org.onap.policy.drools.core.lock.LockState;
49 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
50 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
51 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
52 import org.onap.policy.drools.system.PolicyEngine;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
58 * Distributed implementation of the Lock Feature. Maintains locks across servers using a
62 * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
68 * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
69 * instead populated with the {@link #uuidString}.</li>
70 * <li>A periodic check of the DB is made to determine if any of the locks have
72 * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
73 * will be added to the map once free() or extend() is invoked, provided there isn't
74 * already an entry. In addition, it initially has the host and UUID of the feature
75 * instance that created it. However, as soon as doExtend() completes successfully, the
76 * host and UUID of the lock will be updated to reflect the values within this feature
80 public class DistributedLockManager
81 implements PolicyResourceLockManager, PolicyEngineFeatureApi {
83 private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
85 private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
86 private static final String LOCK_LOST_MSG = "lock lost";
87 private static final String NOT_LOCKED_MSG = "not locked";
89 @Getter(AccessLevel.PROTECTED)
90 @Setter(AccessLevel.PROTECTED)
91 private static DistributedLockManager latestInstance = null;
95 * Name of the host on which this JVM is running.
98 private final String hostName;
101 * UUID of this object.
104 private final String uuidString = UUID.randomUUID().toString();
107 * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
108 * lock is added to the map, it remains in the map until the lock is lost or until the
109 * unlock request completes.
111 private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
114 * Engine with which this manager is associated.
116 private PolicyEngine engine;
119 * Feature properties.
121 private DistributedLockProperties featProps;
124 * Thread pool used to check for lock expiration and to notify owners when locks are
127 private ScheduledExecutorService exsvc = null;
130 * Data source used to connect to the DB.
132 private BasicDataSource dataSource = null;
136 * Constructs the object.
138 public DistributedLockManager() {
139 this.hostName = NetworkUtil.getHostname();
143 public int getSequenceNumber() {
148 public boolean isAlive() {
149 return (exsvc != null);
153 public boolean start() {
154 // handled via engine API
159 public boolean stop() {
160 // handled via engine API
165 public void shutdown() {
166 // handled via engine API
170 public boolean isLocked() {
175 public boolean lock() {
180 public boolean unlock() {
185 public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
188 this.engine = engine;
189 this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
190 this.exsvc = getThreadPool();
191 this.dataSource = makeDataSource();
195 } catch (Exception e) {
196 throw new DistributedLockManagerException(e);
201 public boolean afterStart(PolicyEngine engine) {
204 exsvc.execute(this::deleteExpiredDbLocks);
205 exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
207 setLatestInstance(this);
209 } catch (Exception e) {
210 throw new DistributedLockManagerException(e);
219 * @return a new, pooled data source
220 * @throws Exception exception
222 protected BasicDataSource makeDataSource() throws Exception {
223 Properties props = new Properties();
224 props.put("driverClassName", featProps.getDbDriver());
225 props.put("url", featProps.getDbUrl());
226 props.put("username", featProps.getDbUser());
227 props.put("password", featProps.getDbPwd());
228 props.put("testOnBorrow", "true");
229 props.put("poolPreparedStatements", "true");
231 // additional properties are listed in the GenericObjectPool API
233 return BasicDataSourceFactory.createDataSource(props);
237 * Deletes expired locks from the DB.
239 private void deleteExpiredDbLocks() {
240 logger.info("deleting all expired locks from the DB");
242 try (Connection conn = dataSource.getConnection();
243 PreparedStatement stmt = conn
244 .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
246 int ndel = stmt.executeUpdate();
247 logger.info("deleted {} expired locks from the DB", ndel);
249 } catch (SQLException e) {
250 logger.warn("failed to delete expired locks from the DB", e);
255 * Closes the data source. Does <i>not</i> invoke any lock call-backs.
258 public boolean afterStop(PolicyEngine engine) {
265 * Closes {@link #dataSource} and sets it to {@code null}.
267 private void closeDataSource() {
269 if (dataSource != null) {
273 } catch (SQLException e) {
274 logger.error("cannot close the distributed locking DB", e);
281 public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
282 boolean waitForLock) {
284 if (latestInstance != this) {
285 AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
286 lock.notifyUnavailable();
290 DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
292 DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
294 // do these outside of compute() to avoid blocking other map operations
295 if (existingLock == null) {
296 logger.debug("added lock to map {}", lock);
297 lock.scheduleRequest(lock::doLock);
299 lock.deny("resource is busy", true);
306 * Checks for expired locks.
308 private void checkExpired() {
311 logger.info("checking for expired locks");
312 Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
313 identifyDbLocks(expiredIds);
314 expireLocks(expiredIds);
316 exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
318 } catch (RejectedExecutionException e) {
319 logger.warn("thread pool is no longer accepting requests", e);
321 } catch (SQLException | RuntimeException e) {
322 logger.error("error checking expired locks", e);
323 exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
326 logger.info("done checking for expired locks");
330 * Identifies this feature instance's locks that the DB indicates are still active.
332 * @param expiredIds IDs of resources that have expired locks. If a resource is still
333 * locked, it's ID is removed from this set
334 * @throws SQLException if a DB error occurs
336 private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
338 * We could query for host and UUIDs that actually appear within the locks, but
339 * those might change while the query is running so no real value in doing that.
340 * On the other hand, there's only a brief instance between the time a
341 * deserialized lock is added to this feature instance and its doExtend() method
342 * updates its host and UUID to match this feature instance. If this happens to
343 * run during that brief instance, then the lock will be lost and the callback
344 * invoked. It isn't worth complicating this code further to handle those highly
349 try (Connection conn = dataSource.getConnection();
350 PreparedStatement stmt = conn.prepareStatement(
351 "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
354 stmt.setString(1, hostName);
355 stmt.setString(2, uuidString);
357 try (ResultSet resultSet = stmt.executeQuery()) {
358 while (resultSet.next()) {
359 String resourceId = resultSet.getString(1);
361 // we have now seen this resource id
362 expiredIds.remove(resourceId);
369 * Expires locks for the resources that no longer appear within the DB.
371 * @param expiredIds IDs of resources that have expired locks
373 private void expireLocks(Set<String> expiredIds) {
374 for (String resourceId : expiredIds) {
375 AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
377 resource2lock.computeIfPresent(resourceId, (key, lock) -> {
378 if (lock.isActive()) {
379 // it thinks it's active, but it isn't - remove from the map
387 DistributedLock lock = lockref.get();
389 logger.debug("removed lock from map {}", lock);
390 lock.deny(LOCK_LOST_MSG, false);
396 * Distributed Lock implementation.
398 public static class DistributedLock extends LockImpl {
399 private static final long serialVersionUID = 1L;
402 * Feature containing this lock. May be {@code null} until the feature is
403 * identified. Note: this can only be null if the lock has been de-serialized.
405 private transient DistributedLockManager feature;
408 * Host name from the feature instance that created this object. Replaced with the
409 * host name from the current feature instance whenever the lock is successfully
412 private String hostName;
415 * UUID string from the feature instance that created this object. Replaced with
416 * the UUID string from the current feature instance whenever the lock is
417 * successfully extended.
419 private String uuidString;
422 * {@code True} if the lock is busy making a request, {@code false} otherwise.
424 private transient boolean busy = false;
427 * Request to be performed.
429 private transient RunnableWithEx request = null;
432 * Number of times we've retried a request.
434 private transient int nretries = 0;
437 * Constructs the object.
439 public DistributedLock() {
441 this.uuidString = "";
445 * Constructs the object.
447 * @param state initial state of the lock
448 * @param resourceId identifier of the resource to be locked
449 * @param ownerKey information identifying the owner requesting the lock
450 * @param holdSec amount of time, in seconds, for which the lock should be held,
451 * after which it will automatically be released
452 * @param callback callback to be invoked once the lock is granted, or
453 * subsequently lost; must not be {@code null}
454 * @param feature feature containing this lock
456 public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
457 DistributedLockManager feature) {
458 super(state, resourceId, ownerKey, holdSec, callback);
460 this.feature = feature;
461 this.hostName = feature.hostName;
462 this.uuidString = feature.uuidString;
466 * Grants this lock. The notification is <i>always</i> invoked via the
467 * <i>foreground</i> thread.
469 protected void grant() {
470 synchronized (this) {
471 if (isUnavailable()) {
475 setState(LockState.ACTIVE);
478 logger.info("lock granted: {}", this);
484 * Permanently denies this lock.
486 * @param reason the reason the lock was denied
487 * @param foreground {@code true} if the callback can be invoked in the current
488 * (i.e., foreground) thread, {@code false} if it should be invoked via the
491 protected void deny(String reason, boolean foreground) {
492 synchronized (this) {
493 setState(LockState.UNAVAILABLE);
496 logger.info("{}: {}", reason, this);
498 if (feature == null || foreground) {
502 feature.exsvc.execute(this::notifyUnavailable);
507 public boolean free() {
508 // do a quick check of the state
509 if (isUnavailable()) {
513 logger.info("releasing lock: {}", this);
515 if (!attachFeature()) {
516 setState(LockState.UNAVAILABLE);
520 AtomicBoolean result = new AtomicBoolean(false);
522 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
523 if (curlock == this && !isUnavailable()) {
524 // this lock was the owner
526 setState(LockState.UNAVAILABLE);
529 * NOTE: do NOT return null; curlock must remain until doUnlock
538 scheduleRequest(this::doUnlock);
546 public void extend(int holdSec, LockCallback callback) {
548 throw new IllegalArgumentException("holdSec is negative");
552 setCallback(callback);
554 // do a quick check of the state
555 if (isUnavailable() || !attachFeature()) {
556 deny(LOCK_LOST_MSG, true);
560 AtomicBoolean success = new AtomicBoolean(false);
562 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
563 if (curlock == this && !isUnavailable()) {
565 setState(LockState.WAITING);
568 // note: leave it in the map until doUnlock() removes it
574 scheduleRequest(this::doExtend);
577 deny(NOT_LOCKED_MSG, true);
582 * Attaches to the feature instance, if not already attached.
584 * @return {@code true} if the lock is now attached to a feature, {@code false}
587 private synchronized boolean attachFeature() {
588 if (feature != null) {
593 feature = latestInstance;
594 if (feature == null) {
595 logger.warn("no feature yet for {}", this);
599 // put this lock into the map
600 feature.resource2lock.putIfAbsent(getResourceId(), this);
606 * Schedules a request for execution.
608 * @param schedreq the request that should be scheduled
610 private synchronized void scheduleRequest(RunnableWithEx schedreq) {
611 logger.debug("schedule lock action {}", this);
614 feature.exsvc.execute(this::doRequest);
618 * Reschedules a request for execution, if there is not already a request in the
619 * queue, and if the retry count has not been exhausted.
621 * @param req request to be rescheduled
623 private void rescheduleRequest(RunnableWithEx req) {
624 synchronized (this) {
625 if (request != null) {
626 // a new request has already been scheduled - it supersedes "req"
627 logger.debug("not rescheduling lock action {}", this);
631 if (nretries++ < feature.featProps.getMaxRetries()) {
632 logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
634 feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
639 logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
644 * Gets, and removes, the next request from the queue. Clears {@link #busy} if
645 * there are no more requests in the queue.
647 * @param prevReq the previous request that was just run
649 * @return the next request, or {@code null} if the queue is empty
651 private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
652 if (request == null || request == prevReq) {
653 logger.debug("no more requests for {}", this);
658 RunnableWithEx req = request;
665 * Executes the current request, if none are currently executing.
667 private void doRequest() {
668 synchronized (this) {
670 // another thread is already processing the request(s)
677 * There is a race condition wherein this thread could invoke run() while the
678 * next scheduled thread checks the busy flag and finds that work is being
679 * done and returns, leaving the next work item in "request". In that case,
680 * the next work item may never be executed, thus we use a loop here, instead
681 * of just executing a single request.
683 RunnableWithEx req = null;
684 while ((req = getNextRequest(req)) != null) {
685 if (feature.resource2lock.get(getResourceId()) != this) {
687 * no longer in the map - don't apply the action, as it may interfere
688 * with any newly added Lock object
690 logger.debug("discard lock action {}", this);
691 synchronized (this) {
699 * Run the request. If it throws an exception, then it will be
700 * rescheduled for execution a little later.
704 } catch (SQLException e) {
705 logger.warn("request failed for lock: {}", this, e);
707 if (feature.featProps.isTransient(e.getErrorCode())) {
708 // retry the request a little later
709 rescheduleRequest(req);
714 } catch (RuntimeException e) {
715 logger.warn("request failed for lock: {}", this, e);
722 * Attempts to add a lock to the DB. Generates a callback, indicating success or
725 * @throws SQLException if a DB error occurs
727 private void doLock() throws SQLException {
729 logger.debug("discard doLock {}", this);
734 * There is a small window in which a client could invoke free() before the DB
735 * is updated. In that case, doUnlock will be added to the queue to run after
736 * this, which will delete the record, as desired. In addition, grant() will
737 * not do anything, because the lock state will have been set to UNAVAILABLE
741 logger.debug("doLock {}", this);
742 try (Connection conn = feature.dataSource.getConnection()) {
743 boolean success = false;
745 success = doDbInsert(conn);
747 } catch (SQLException e) {
748 logger.info("failed to insert lock record - attempting update: {}", this, e);
749 success = doDbUpdate(conn);
762 * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
763 * it fails, as this should only be executed in response to a call to
766 * @throws SQLException if a DB error occurs
768 private void doUnlock() throws SQLException {
769 logger.debug("unlock {}", this);
770 try (Connection conn = feature.dataSource.getConnection()) {
778 * Attempts to extend a lock in the DB. Generates a callback, indicating success
781 * @throws SQLException if a DB error occurs
783 private void doExtend() throws SQLException {
785 logger.debug("discard doExtend {}", this);
790 * There is a small window in which a client could invoke free() before the DB
791 * is updated. In that case, doUnlock will be added to the queue to run after
792 * this, which will delete the record, as desired. In addition, grant() will
793 * not do anything, because the lock state will have been set to UNAVAILABLE
797 logger.debug("doExtend {}", this);
798 try (Connection conn = feature.dataSource.getConnection()) {
800 * invoker may have called extend() before free() had a chance to insert
801 * the record, thus we have to try to insert, if the update fails
803 if (doDbUpdate(conn) || doDbInsert(conn)) {
813 * Inserts the lock into the DB.
815 * @param conn DB connection
816 * @return {@code true} if a record was successfully inserted, {@code false}
818 * @throws SQLException if a DB error occurs
820 protected boolean doDbInsert(Connection conn) throws SQLException {
821 logger.debug("insert lock record {}", this);
822 try (PreparedStatement stmt =
823 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
824 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
826 stmt.setString(1, getResourceId());
827 stmt.setString(2, feature.hostName);
828 stmt.setString(3, feature.uuidString);
829 stmt.setInt(4, getHoldSec());
831 stmt.executeUpdate();
833 this.hostName = feature.hostName;
834 this.uuidString = feature.uuidString;
841 * Updates the lock in the DB.
843 * @param conn DB connection
844 * @return {@code true} if a record was successfully updated, {@code false}
846 * @throws SQLException if a DB error occurs
848 protected boolean doDbUpdate(Connection conn) throws SQLException {
849 logger.debug("update lock record {}", this);
850 try (PreparedStatement stmt =
851 conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
852 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
853 + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
855 stmt.setString(1, getResourceId());
856 stmt.setString(2, feature.hostName);
857 stmt.setString(3, feature.uuidString);
858 stmt.setInt(4, getHoldSec());
860 stmt.setString(5, getResourceId());
861 stmt.setString(6, this.hostName);
862 stmt.setString(7, this.uuidString);
864 if (stmt.executeUpdate() != 1) {
868 this.hostName = feature.hostName;
869 this.uuidString = feature.uuidString;
876 * Deletes the lock from the DB.
878 * @param conn DB connection
879 * @throws SQLException if a DB error occurs
881 protected void doDbDelete(Connection conn) throws SQLException {
882 logger.debug("delete lock record {}", this);
883 try (PreparedStatement stmt =
884 conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
886 stmt.setString(1, getResourceId());
887 stmt.setString(2, this.hostName);
888 stmt.setString(3, this.uuidString);
890 stmt.executeUpdate();
895 * Removes the lock from the map, and sends a notification using the current
898 private void removeFromMap() {
899 logger.debug("remove lock from map {}", this);
900 feature.resource2lock.remove(getResourceId(), this);
902 synchronized (this) {
903 if (!isUnavailable()) {
904 deny(LOCK_LOST_MSG, true);
910 public String toString() {
911 return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
912 + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
918 private static interface RunnableWithEx {
919 void run() throws SQLException;
922 // these may be overridden by junit tests
924 protected Properties getProperties(String fileName) {
925 return SystemPersistenceConstants.getManager().getProperties(fileName);
928 protected ScheduledExecutorService getThreadPool() {
929 return engine.getExecutorService();
932 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
933 LockCallback callback) {
934 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);