2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.SQLException;
26 import java.sql.SQLTransientException;
27 import java.util.HashSet;
29 import java.util.Properties;
31 import java.util.UUID;
32 import java.util.concurrent.RejectedExecutionException;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import lombok.AccessLevel;
41 import org.apache.commons.dbcp2.BasicDataSource;
42 import org.apache.commons.dbcp2.BasicDataSourceFactory;
43 import org.onap.policy.drools.core.lock.LockCallback;
44 import org.onap.policy.drools.core.lock.LockState;
45 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
46 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
47 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
48 import org.onap.policy.drools.system.PolicyEngine;
49 import org.onap.policy.drools.system.PolicyEngineConstants;
50 import org.onap.policy.drools.system.internal.FeatureLockImpl;
51 import org.onap.policy.drools.system.internal.LockManager;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
57 * Distributed implementation of the Lock Feature. Maintains locks across servers using a
61 * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
67 * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
68 * instead populated with the {@link #uuidString}.</li>
69 * <li>A periodic check of the DB is made to determine if any of the locks have
71 * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
72 * will be added to the map once free() or extend() is invoked, provided there isn't
73 * already an entry. In addition, it initially has the host and UUID of the feature
74 * instance that created it. However, as soon as doExtend() completes successfully, the
75 * host and UUID of the lock will be updated to reflect the values within this feature
79 public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock>
80 implements PolicyEngineFeatureApi {
82 private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
84 private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
86 @Getter(AccessLevel.PROTECTED)
87 @Setter(AccessLevel.PROTECTED)
88 private static DistributedLockManager latestInstance = null;
91 * Name of the host on which this JVM is running.
94 private String pdpName;
97 * UUID of this object.
100 private final String uuidString = UUID.randomUUID().toString();
103 * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
104 * lock is added to the map, it remains in the map until the lock is lost or until the
105 * unlock request completes.
107 private final Map<String, DistributedLock> resource2lock;
110 * Thread pool used to check for lock expiration and to notify owners when locks are
113 private ScheduledExecutorService exsvc = null;
116 * Used to cancel the expiration checker on shutdown.
118 private ScheduledFuture<?> checker = null;
121 * Feature properties.
123 private DistributedLockProperties featProps;
126 * Data source used to connect to the DB.
128 private BasicDataSource dataSource = null;
132 * Constructs the object.
134 public DistributedLockManager() {
135 this.resource2lock = getResource2lock();
139 public int getSequenceNumber() {
144 public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
147 this.pdpName = PolicyEngineConstants.getManager().getPdpName();
148 this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
149 this.dataSource = makeDataSource();
153 } catch (Exception e) {
154 throw new DistributedLockManagerException(e);
159 public boolean afterStart(PolicyEngine engine) {
162 exsvc = PolicyEngineConstants.getManager().getExecutorService();
163 exsvc.execute(this::deleteExpiredDbLocks);
164 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
166 setLatestInstance(this);
168 } catch (Exception e) {
169 throw new DistributedLockManagerException(e);
178 * @return a new, pooled data source
179 * @throws Exception exception
181 protected BasicDataSource makeDataSource() throws Exception {
182 var props = new Properties();
183 props.put("driverClassName", featProps.getDbDriver());
184 props.put("url", featProps.getDbUrl());
185 props.put("username", featProps.getDbUser());
186 props.put("password", featProps.getDbPwd());
187 props.put("testOnBorrow", "true");
188 props.put("poolPreparedStatements", "true");
190 // additional properties are listed in the GenericObjectPool API
192 return BasicDataSourceFactory.createDataSource(props);
196 * Deletes expired locks from the DB.
198 private void deleteExpiredDbLocks() {
199 logger.info("deleting all expired locks from the DB");
201 try (var conn = dataSource.getConnection();
202 var stmt = conn.prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
204 int ndel = stmt.executeUpdate();
205 logger.info("deleted {} expired locks from the DB", ndel);
207 } catch (SQLException e) {
208 logger.warn("failed to delete expired locks from the DB", e);
213 * Closes the data source. Does <i>not</i> invoke any lock call-backs.
216 public boolean afterStop(PolicyEngine engine) {
219 if (checker != null) {
220 checker.cancel(true);
228 * Closes {@link #dataSource} and sets it to {@code null}.
230 private void closeDataSource() {
232 if (dataSource != null) {
236 } catch (SQLException e) {
237 logger.error("cannot close the distributed locking DB", e);
244 protected boolean hasInstanceChanged() {
245 return (getLatestInstance() != this);
249 protected void finishLock(DistributedLock lock) {
250 lock.scheduleRequest(lock::doLock);
254 * Checks for expired locks.
256 private void checkExpired() {
258 Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
259 logger.info("checking for expired locks: {}", this);
261 identifyDbLocks(expiredIds);
262 expireLocks(expiredIds);
264 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
266 } catch (RejectedExecutionException e) {
267 logger.warn("thread pool is no longer accepting requests", e);
269 } catch (SQLException | RuntimeException e) {
270 logger.error("error checking expired locks", e);
273 checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
277 logger.info("done checking for expired locks");
281 * Identifies this feature instance's locks that the DB indicates are still active.
283 * @param expiredIds IDs of resources that have expired locks. If a resource is still
284 * locked, it's ID is removed from this set
285 * @throws SQLException if a DB error occurs
287 private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
289 * We could query for host and UUIDs that actually appear within the locks, but
290 * those might change while the query is running so no real value in doing that.
291 * On the other hand, there's only a brief instance between the time a
292 * deserialized lock is added to this feature instance and its doExtend() method
293 * updates its host and UUID to match this feature instance. If this happens to
294 * run during that brief instance, then the lock will be lost and the callback
295 * invoked. It isn't worth complicating this code further to handle those highly
300 try (var conn = dataSource.getConnection();
301 PreparedStatement stmt = conn.prepareStatement(
302 "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
305 stmt.setString(1, pdpName);
306 stmt.setString(2, uuidString);
308 try (var resultSet = stmt.executeQuery()) {
309 while (resultSet.next()) {
310 var resourceId = resultSet.getString(1);
312 // we have now seen this resource id
313 expiredIds.remove(resourceId);
320 * Expires locks for the resources that no longer appear within the DB.
322 * @param expiredIds IDs of resources that have expired locks
324 private void expireLocks(Set<String> expiredIds) {
325 for (String resourceId : expiredIds) {
326 AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
328 resource2lock.computeIfPresent(resourceId, (key, lock) -> {
329 if (lock.isActive()) {
330 // it thinks it's active, but it isn't - remove from the map
338 DistributedLock lock = lockref.get();
340 logger.info("lost lock: removed lock from map {}", lock);
341 lock.deny(FeatureLockImpl.LOCK_LOST_MSG);
347 * Distributed Lock implementation.
349 public static class DistributedLock extends FeatureLockImpl {
350 private static final String SQL_FAILED_MSG = "request failed for lock: {}";
352 private static final long serialVersionUID = 1L;
355 * Feature containing this lock. May be {@code null} until the feature is
356 * identified. Note: this can only be null if the lock has been de-serialized.
358 private transient DistributedLockManager feature;
361 * Host name from the feature instance that created this object. Replaced with the
362 * host name from the current feature instance whenever the lock is successfully
365 private String hostName;
368 * UUID string from the feature instance that created this object. Replaced with
369 * the UUID string from the current feature instance whenever the lock is
370 * successfully extended.
372 private String uuidString;
375 * {@code True} if the lock is busy making a request, {@code false} otherwise.
377 private transient boolean busy = false;
380 * Request to be performed.
382 private transient RunnableWithEx request = null;
385 * Number of times we've retried a request.
387 private transient int nretries = 0;
390 * Constructs the object.
392 public DistributedLock() {
395 this.uuidString = "";
399 * Constructs the object.
401 * @param state initial state of the lock
402 * @param resourceId identifier of the resource to be locked
403 * @param ownerKey information identifying the owner requesting the lock
404 * @param holdSec amount of time, in seconds, for which the lock should be held,
405 * after which it will automatically be released
406 * @param callback callback to be invoked once the lock is granted, or
407 * subsequently lost; must not be {@code null}
408 * @param feature feature containing this lock
410 public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
411 DistributedLockManager feature) {
412 super(state, resourceId, ownerKey, holdSec, callback);
414 this.feature = feature;
415 this.hostName = feature.pdpName;
416 this.uuidString = feature.uuidString;
420 public boolean free() {
421 if (!freeAllowed()) {
425 var result = new AtomicBoolean(false);
427 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
428 if (curlock == this && !isUnavailable()) {
429 // this lock was the owner
431 setState(LockState.UNAVAILABLE);
434 * NOTE: do NOT return null; curlock must remain until doUnlock
443 scheduleRequest(this::doUnlock);
451 public void extend(int holdSec, LockCallback callback) {
452 if (!extendAllowed(holdSec, callback)) {
456 var success = new AtomicBoolean(false);
458 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
459 if (curlock == this && !isUnavailable()) {
461 setState(LockState.WAITING);
464 // note: leave it in the map until doUnlock() removes it
470 scheduleRequest(this::doExtend);
473 deny(NOT_LOCKED_MSG);
478 protected boolean addToFeature() {
479 feature = getLatestInstance();
480 if (feature == null) {
481 logger.warn("no feature yet for {}", this);
485 // put this lock into the map
486 feature.resource2lock.putIfAbsent(getResourceId(), this);
492 * Schedules a request for execution.
494 * @param schedreq the request that should be scheduled
496 private synchronized void scheduleRequest(RunnableWithEx schedreq) {
497 logger.debug("schedule lock action {}", this);
500 getThreadPool().execute(this::doRequest);
504 * Reschedules a request for execution, if there is not already a request in the
505 * queue, and if the retry count has not been exhausted.
507 * @param req request to be rescheduled
509 private void rescheduleRequest(RunnableWithEx req) {
510 synchronized (this) {
511 if (request != null) {
512 // a new request has already been scheduled - it supersedes "req"
513 logger.debug("not rescheduling lock action {}", this);
517 if (nretries++ < feature.featProps.getMaxRetries()) {
518 logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
520 getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
525 logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
530 * Gets, and removes, the next request from the queue. Clears {@link #busy} if
531 * there are no more requests in the queue.
533 * @param prevReq the previous request that was just run
535 * @return the next request, or {@code null} if the queue is empty
537 private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
538 if (request == null || request == prevReq) {
539 logger.debug("no more requests for {}", this);
544 RunnableWithEx req = request;
551 * Executes the current request, if none are currently executing.
553 private void doRequest() {
554 synchronized (this) {
556 // another thread is already processing the request(s)
563 * There is a race condition wherein this thread could invoke run() while the
564 * next scheduled thread checks the busy flag and finds that work is being
565 * done and returns, leaving the next work item in "request". In that case,
566 * the next work item may never be executed, thus we use a loop here, instead
567 * of just executing a single request.
569 RunnableWithEx req = null;
570 while ((req = getNextRequest(req)) != null) {
571 if (feature.resource2lock.get(getResourceId()) != this) {
573 * no longer in the map - don't apply the action, as it may interfere
574 * with any newly added Lock object
576 logger.debug("discard lock action {}", this);
577 synchronized (this) {
585 * Run the request. If it throws an exception, then it will be
586 * rescheduled for execution a little later.
590 } catch (SQLException e) {
591 logger.warn(SQL_FAILED_MSG, this, e);
593 if (e.getCause() instanceof SQLTransientException) {
594 // retry the request a little later
595 rescheduleRequest(req);
600 } catch (RuntimeException e) {
601 logger.warn(SQL_FAILED_MSG, this, e);
608 * Attempts to add a lock to the DB. Generates a callback, indicating success or
611 * @throws SQLException if a DB error occurs
613 private void doLock() throws SQLException {
615 logger.debug("discard doLock {}", this);
620 * There is a small window in which a client could invoke free() before the DB
621 * is updated. In that case, doUnlock will be added to the queue to run after
622 * this, which will delete the record, as desired. In addition, grant() will
623 * not do anything, because the lock state will have been set to UNAVAILABLE
627 logger.debug("doLock {}", this);
628 try (var conn = feature.dataSource.getConnection()) {
631 success = doDbInsert(conn);
633 } catch (SQLException e) {
634 logger.info("failed to insert lock record - attempting update: {}", this, e);
635 success = doDbUpdate(conn);
648 * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
649 * it fails, as this should only be executed in response to a call to
652 * @throws SQLException if a DB error occurs
654 private void doUnlock() throws SQLException {
655 logger.debug("unlock {}", this);
656 try (var conn = feature.dataSource.getConnection()) {
664 * Attempts to extend a lock in the DB. Generates a callback, indicating success
667 * @throws SQLException if a DB error occurs
669 private void doExtend() throws SQLException {
671 logger.debug("discard doExtend {}", this);
676 * There is a small window in which a client could invoke free() before the DB
677 * is updated. In that case, doUnlock will be added to the queue to run after
678 * this, which will delete the record, as desired. In addition, grant() will
679 * not do anything, because the lock state will have been set to UNAVAILABLE
683 logger.debug("doExtend {}", this);
684 try (var conn = feature.dataSource.getConnection()) {
686 * invoker may have called extend() before free() had a chance to insert
687 * the record, thus we have to try to insert, if the update fails
689 if (doDbUpdate(conn) || doDbInsert(conn)) {
699 * Inserts the lock into the DB.
701 * @param conn DB connection
702 * @return {@code true} if a record was successfully inserted, {@code false}
704 * @throws SQLException if a DB error occurs
706 protected boolean doDbInsert(Connection conn) throws SQLException {
707 logger.info("insert lock record {}", this);
708 try (var stmt = conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
709 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
711 stmt.setString(1, getResourceId());
712 stmt.setString(2, feature.pdpName);
713 stmt.setString(3, feature.uuidString);
714 stmt.setInt(4, getHoldSec());
716 stmt.executeUpdate();
718 this.hostName = feature.pdpName;
719 this.uuidString = feature.uuidString;
726 * Updates the lock in the DB.
728 * @param conn DB connection
729 * @return {@code true} if a record was successfully updated, {@code false}
731 * @throws SQLException if a DB error occurs
733 protected boolean doDbUpdate(Connection conn) throws SQLException {
734 logger.info("update lock record {}", this);
735 try (var stmt = conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
736 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
737 + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
739 stmt.setString(1, getResourceId());
740 stmt.setString(2, feature.pdpName);
741 stmt.setString(3, feature.uuidString);
742 stmt.setInt(4, getHoldSec());
744 stmt.setString(5, getResourceId());
745 stmt.setString(6, this.hostName);
746 stmt.setString(7, this.uuidString);
748 if (stmt.executeUpdate() != 1) {
752 this.hostName = feature.pdpName;
753 this.uuidString = feature.uuidString;
760 * Deletes the lock from the DB.
762 * @param conn DB connection
763 * @throws SQLException if a DB error occurs
765 protected void doDbDelete(Connection conn) throws SQLException {
766 logger.info("delete lock record {}", this);
768 .prepareStatement("DELETE FROM pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
770 stmt.setString(1, getResourceId());
771 stmt.setString(2, this.hostName);
772 stmt.setString(3, this.uuidString);
774 stmt.executeUpdate();
779 * Removes the lock from the map, and sends a notification using the current
782 private void removeFromMap() {
783 logger.info("remove lock from map {}", this);
784 feature.resource2lock.remove(getResourceId(), this);
786 synchronized (this) {
787 if (!isUnavailable()) {
794 public String toString() {
795 return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
796 + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
802 private interface RunnableWithEx {
803 void run() throws SQLException;
807 public String toString() {
808 return "DistributedLockManager [" + "pdpName=" + pdpName + ", uuidString=" + uuidString
809 + ", resource2lock=" + resource2lock + "]";
812 // these may be overridden by junit tests
814 protected Properties getProperties(String fileName) {
815 return SystemPersistenceConstants.getManager().getProperties(fileName);
818 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
819 LockCallback callback) {
820 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);