2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.ResultSet;
26 import java.sql.SQLException;
27 import java.sql.SQLTransientException;
28 import java.util.HashSet;
30 import java.util.Properties;
32 import java.util.UUID;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.ScheduledFuture;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import lombok.AccessLevel;
42 import org.apache.commons.dbcp2.BasicDataSource;
43 import org.apache.commons.dbcp2.BasicDataSourceFactory;
44 import org.onap.policy.drools.core.lock.LockCallback;
45 import org.onap.policy.drools.core.lock.LockState;
46 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
47 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
48 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
49 import org.onap.policy.drools.system.PolicyEngine;
50 import org.onap.policy.drools.system.PolicyEngineConstants;
51 import org.onap.policy.drools.system.internal.FeatureLockImpl;
52 import org.onap.policy.drools.system.internal.LockManager;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
58 * Distributed implementation of the Lock Feature. Maintains locks across servers using a
62 * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
68 * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
69 * instead populated with the {@link #uuidString}.</li>
70 * <li>A periodic check of the DB is made to determine if any of the locks have
72 * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
73 * will be added to the map once free() or extend() is invoked, provided there isn't
74 * already an entry. In addition, it initially has the host and UUID of the feature
75 * instance that created it. However, as soon as doExtend() completes successfully, the
76 * host and UUID of the lock will be updated to reflect the values within this feature
80 public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock>
81 implements PolicyEngineFeatureApi {
83 private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
85 private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
87 @Getter(AccessLevel.PROTECTED)
88 @Setter(AccessLevel.PROTECTED)
89 private static DistributedLockManager latestInstance = null;
93 * Name of the host on which this JVM is running.
96 private final String pdpName;
99 * UUID of this object.
102 private final String uuidString = UUID.randomUUID().toString();
105 * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
106 * lock is added to the map, it remains in the map until the lock is lost or until the
107 * unlock request completes.
109 private final Map<String, DistributedLock> resource2lock;
112 * Thread pool used to check for lock expiration and to notify owners when locks are
115 private ScheduledExecutorService exsvc = null;
118 * Used to cancel the expiration checker on shutdown.
120 private ScheduledFuture<?> checker = null;
123 * Feature properties.
125 private DistributedLockProperties featProps;
128 * Data source used to connect to the DB.
130 private BasicDataSource dataSource = null;
134 * Constructs the object.
136 public DistributedLockManager() {
137 this.pdpName = PolicyEngineConstants.PDP_NAME;
138 this.resource2lock = getResource2lock();
142 public int getSequenceNumber() {
147 public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
150 this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
151 this.dataSource = makeDataSource();
155 } catch (Exception e) {
156 throw new DistributedLockManagerException(e);
161 public boolean afterStart(PolicyEngine engine) {
164 exsvc = PolicyEngineConstants.getManager().getExecutorService();
165 exsvc.execute(this::deleteExpiredDbLocks);
166 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
168 setLatestInstance(this);
170 } catch (Exception e) {
171 throw new DistributedLockManagerException(e);
180 * @return a new, pooled data source
181 * @throws Exception exception
183 protected BasicDataSource makeDataSource() throws Exception {
184 Properties props = new Properties();
185 props.put("driverClassName", featProps.getDbDriver());
186 props.put("url", featProps.getDbUrl());
187 props.put("username", featProps.getDbUser());
188 props.put("password", featProps.getDbPwd());
189 props.put("testOnBorrow", "true");
190 props.put("poolPreparedStatements", "true");
192 // additional properties are listed in the GenericObjectPool API
194 return BasicDataSourceFactory.createDataSource(props);
198 * Deletes expired locks from the DB.
200 private void deleteExpiredDbLocks() {
201 logger.info("deleting all expired locks from the DB");
203 try (Connection conn = dataSource.getConnection();
204 PreparedStatement stmt = conn
205 .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
207 int ndel = stmt.executeUpdate();
208 logger.info("deleted {} expired locks from the DB", ndel);
210 } catch (SQLException e) {
211 logger.warn("failed to delete expired locks from the DB", e);
216 * Closes the data source. Does <i>not</i> invoke any lock call-backs.
219 public boolean afterStop(PolicyEngine engine) {
222 if (checker != null) {
223 checker.cancel(true);
231 * Closes {@link #dataSource} and sets it to {@code null}.
233 private void closeDataSource() {
235 if (dataSource != null) {
239 } catch (SQLException e) {
240 logger.error("cannot close the distributed locking DB", e);
247 protected boolean hasInstanceChanged() {
248 return (getLatestInstance() != this);
252 protected void finishLock(DistributedLock lock) {
253 lock.scheduleRequest(lock::doLock);
257 * Checks for expired locks.
259 private void checkExpired() {
261 logger.info("checking for expired locks");
262 Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
263 identifyDbLocks(expiredIds);
264 expireLocks(expiredIds);
266 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
268 } catch (RejectedExecutionException e) {
269 logger.warn("thread pool is no longer accepting requests", e);
271 } catch (SQLException | RuntimeException e) {
272 logger.error("error checking expired locks", e);
275 checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
279 logger.info("done checking for expired locks");
283 * Identifies this feature instance's locks that the DB indicates are still active.
285 * @param expiredIds IDs of resources that have expired locks. If a resource is still
286 * locked, it's ID is removed from this set
287 * @throws SQLException if a DB error occurs
289 private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
291 * We could query for host and UUIDs that actually appear within the locks, but
292 * those might change while the query is running so no real value in doing that.
293 * On the other hand, there's only a brief instance between the time a
294 * deserialized lock is added to this feature instance and its doExtend() method
295 * updates its host and UUID to match this feature instance. If this happens to
296 * run during that brief instance, then the lock will be lost and the callback
297 * invoked. It isn't worth complicating this code further to handle those highly
302 try (Connection conn = dataSource.getConnection();
303 PreparedStatement stmt = conn.prepareStatement(
304 "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
307 stmt.setString(1, pdpName);
308 stmt.setString(2, uuidString);
310 try (ResultSet resultSet = stmt.executeQuery()) {
311 while (resultSet.next()) {
312 String resourceId = resultSet.getString(1);
314 // we have now seen this resource id
315 expiredIds.remove(resourceId);
322 * Expires locks for the resources that no longer appear within the DB.
324 * @param expiredIds IDs of resources that have expired locks
326 private void expireLocks(Set<String> expiredIds) {
327 for (String resourceId : expiredIds) {
328 AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
330 resource2lock.computeIfPresent(resourceId, (key, lock) -> {
331 if (lock.isActive()) {
332 // it thinks it's active, but it isn't - remove from the map
340 DistributedLock lock = lockref.get();
342 logger.debug("removed lock from map {}", lock);
343 lock.deny(FeatureLockImpl.LOCK_LOST_MSG);
349 * Distributed Lock implementation.
351 public static class DistributedLock extends FeatureLockImpl {
352 private static final String SQL_FAILED_MSG = "request failed for lock: {}";
354 private static final long serialVersionUID = 1L;
357 * Feature containing this lock. May be {@code null} until the feature is
358 * identified. Note: this can only be null if the lock has been de-serialized.
360 private transient DistributedLockManager feature;
363 * Host name from the feature instance that created this object. Replaced with the
364 * host name from the current feature instance whenever the lock is successfully
367 private String hostName;
370 * UUID string from the feature instance that created this object. Replaced with
371 * the UUID string from the current feature instance whenever the lock is
372 * successfully extended.
374 private String uuidString;
377 * {@code True} if the lock is busy making a request, {@code false} otherwise.
379 private transient boolean busy = false;
382 * Request to be performed.
384 private transient RunnableWithEx request = null;
387 * Number of times we've retried a request.
389 private transient int nretries = 0;
392 * Constructs the object.
394 public DistributedLock() {
397 this.uuidString = "";
401 * Constructs the object.
403 * @param state initial state of the lock
404 * @param resourceId identifier of the resource to be locked
405 * @param ownerKey information identifying the owner requesting the lock
406 * @param holdSec amount of time, in seconds, for which the lock should be held,
407 * after which it will automatically be released
408 * @param callback callback to be invoked once the lock is granted, or
409 * subsequently lost; must not be {@code null}
410 * @param feature feature containing this lock
412 public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
413 DistributedLockManager feature) {
414 super(state, resourceId, ownerKey, holdSec, callback);
416 this.feature = feature;
417 this.hostName = feature.pdpName;
418 this.uuidString = feature.uuidString;
422 public boolean free() {
423 if (!freeAllowed()) {
427 AtomicBoolean result = new AtomicBoolean(false);
429 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
430 if (curlock == this && !isUnavailable()) {
431 // this lock was the owner
433 setState(LockState.UNAVAILABLE);
436 * NOTE: do NOT return null; curlock must remain until doUnlock
445 scheduleRequest(this::doUnlock);
453 public void extend(int holdSec, LockCallback callback) {
454 if (!extendAllowed(holdSec, callback)) {
458 AtomicBoolean success = new AtomicBoolean(false);
460 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
461 if (curlock == this && !isUnavailable()) {
463 setState(LockState.WAITING);
466 // note: leave it in the map until doUnlock() removes it
472 scheduleRequest(this::doExtend);
475 deny(NOT_LOCKED_MSG);
480 protected boolean addToFeature() {
481 feature = getLatestInstance();
482 if (feature == null) {
483 logger.warn("no feature yet for {}", this);
487 // put this lock into the map
488 feature.resource2lock.putIfAbsent(getResourceId(), this);
494 * Schedules a request for execution.
496 * @param schedreq the request that should be scheduled
498 private synchronized void scheduleRequest(RunnableWithEx schedreq) {
499 logger.debug("schedule lock action {}", this);
502 getThreadPool().execute(this::doRequest);
506 * Reschedules a request for execution, if there is not already a request in the
507 * queue, and if the retry count has not been exhausted.
509 * @param req request to be rescheduled
511 private void rescheduleRequest(RunnableWithEx req) {
512 synchronized (this) {
513 if (request != null) {
514 // a new request has already been scheduled - it supersedes "req"
515 logger.debug("not rescheduling lock action {}", this);
519 if (nretries++ < feature.featProps.getMaxRetries()) {
520 logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
522 getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
527 logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
532 * Gets, and removes, the next request from the queue. Clears {@link #busy} if
533 * there are no more requests in the queue.
535 * @param prevReq the previous request that was just run
537 * @return the next request, or {@code null} if the queue is empty
539 private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
540 if (request == null || request == prevReq) {
541 logger.debug("no more requests for {}", this);
546 RunnableWithEx req = request;
553 * Executes the current request, if none are currently executing.
555 private void doRequest() {
556 synchronized (this) {
558 // another thread is already processing the request(s)
565 * There is a race condition wherein this thread could invoke run() while the
566 * next scheduled thread checks the busy flag and finds that work is being
567 * done and returns, leaving the next work item in "request". In that case,
568 * the next work item may never be executed, thus we use a loop here, instead
569 * of just executing a single request.
571 RunnableWithEx req = null;
572 while ((req = getNextRequest(req)) != null) {
573 if (feature.resource2lock.get(getResourceId()) != this) {
575 * no longer in the map - don't apply the action, as it may interfere
576 * with any newly added Lock object
578 logger.debug("discard lock action {}", this);
579 synchronized (this) {
587 * Run the request. If it throws an exception, then it will be
588 * rescheduled for execution a little later.
592 } catch (SQLException e) {
593 logger.warn(SQL_FAILED_MSG, this, e);
595 if (e.getCause() instanceof SQLTransientException) {
596 // retry the request a little later
597 rescheduleRequest(req);
602 } catch (RuntimeException e) {
603 logger.warn(SQL_FAILED_MSG, this, e);
610 * Attempts to add a lock to the DB. Generates a callback, indicating success or
613 * @throws SQLException if a DB error occurs
615 private void doLock() throws SQLException {
617 logger.debug("discard doLock {}", this);
622 * There is a small window in which a client could invoke free() before the DB
623 * is updated. In that case, doUnlock will be added to the queue to run after
624 * this, which will delete the record, as desired. In addition, grant() will
625 * not do anything, because the lock state will have been set to UNAVAILABLE
629 logger.debug("doLock {}", this);
630 try (Connection conn = feature.dataSource.getConnection()) {
631 boolean success = false;
633 success = doDbInsert(conn);
635 } catch (SQLException e) {
636 logger.info("failed to insert lock record - attempting update: {}", this, e);
637 success = doDbUpdate(conn);
650 * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
651 * it fails, as this should only be executed in response to a call to
654 * @throws SQLException if a DB error occurs
656 private void doUnlock() throws SQLException {
657 logger.debug("unlock {}", this);
658 try (Connection conn = feature.dataSource.getConnection()) {
666 * Attempts to extend a lock in the DB. Generates a callback, indicating success
669 * @throws SQLException if a DB error occurs
671 private void doExtend() throws SQLException {
673 logger.debug("discard doExtend {}", this);
678 * There is a small window in which a client could invoke free() before the DB
679 * is updated. In that case, doUnlock will be added to the queue to run after
680 * this, which will delete the record, as desired. In addition, grant() will
681 * not do anything, because the lock state will have been set to UNAVAILABLE
685 logger.debug("doExtend {}", this);
686 try (Connection conn = feature.dataSource.getConnection()) {
688 * invoker may have called extend() before free() had a chance to insert
689 * the record, thus we have to try to insert, if the update fails
691 if (doDbUpdate(conn) || doDbInsert(conn)) {
701 * Inserts the lock into the DB.
703 * @param conn DB connection
704 * @return {@code true} if a record was successfully inserted, {@code false}
706 * @throws SQLException if a DB error occurs
708 protected boolean doDbInsert(Connection conn) throws SQLException {
709 logger.debug("insert lock record {}", this);
710 try (PreparedStatement stmt =
711 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
712 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
714 stmt.setString(1, getResourceId());
715 stmt.setString(2, feature.pdpName);
716 stmt.setString(3, feature.uuidString);
717 stmt.setInt(4, getHoldSec());
719 stmt.executeUpdate();
721 this.hostName = feature.pdpName;
722 this.uuidString = feature.uuidString;
729 * Updates the lock in the DB.
731 * @param conn DB connection
732 * @return {@code true} if a record was successfully updated, {@code false}
734 * @throws SQLException if a DB error occurs
736 protected boolean doDbUpdate(Connection conn) throws SQLException {
737 logger.debug("update lock record {}", this);
738 try (PreparedStatement stmt =
739 conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
740 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
741 + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
743 stmt.setString(1, getResourceId());
744 stmt.setString(2, feature.pdpName);
745 stmt.setString(3, feature.uuidString);
746 stmt.setInt(4, getHoldSec());
748 stmt.setString(5, getResourceId());
749 stmt.setString(6, this.hostName);
750 stmt.setString(7, this.uuidString);
752 if (stmt.executeUpdate() != 1) {
756 this.hostName = feature.pdpName;
757 this.uuidString = feature.uuidString;
764 * Deletes the lock from the DB.
766 * @param conn DB connection
767 * @throws SQLException if a DB error occurs
769 protected void doDbDelete(Connection conn) throws SQLException {
770 logger.debug("delete lock record {}", this);
771 try (PreparedStatement stmt = conn
772 .prepareStatement("DELETE FROM pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
774 stmt.setString(1, getResourceId());
775 stmt.setString(2, this.hostName);
776 stmt.setString(3, this.uuidString);
778 stmt.executeUpdate();
783 * Removes the lock from the map, and sends a notification using the current
786 private void removeFromMap() {
787 logger.debug("remove lock from map {}", this);
788 feature.resource2lock.remove(getResourceId(), this);
790 synchronized (this) {
791 if (!isUnavailable()) {
798 public String toString() {
799 return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
800 + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
806 private static interface RunnableWithEx {
807 void run() throws SQLException;
810 // these may be overridden by junit tests
812 protected Properties getProperties(String fileName) {
813 return SystemPersistenceConstants.getManager().getProperties(fileName);
816 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
817 LockCallback callback) {
818 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);