2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.SQLException;
26 import java.sql.SQLTransientException;
27 import java.util.HashSet;
29 import java.util.Properties;
31 import java.util.UUID;
32 import java.util.concurrent.RejectedExecutionException;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import lombok.AccessLevel;
41 import org.apache.commons.dbcp2.BasicDataSource;
42 import org.apache.commons.dbcp2.BasicDataSourceFactory;
43 import org.onap.policy.drools.core.lock.LockCallback;
44 import org.onap.policy.drools.core.lock.LockState;
45 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
46 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
47 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
48 import org.onap.policy.drools.system.PolicyEngine;
49 import org.onap.policy.drools.system.PolicyEngineConstants;
50 import org.onap.policy.drools.system.internal.FeatureLockImpl;
51 import org.onap.policy.drools.system.internal.LockManager;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
57 * Distributed implementation of the Lock Feature. Maintains locks across servers using a
61 * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
67 * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
68 * instead populated with the {@link #uuidString}.</li>
69 * <li>A periodic check of the DB is made to determine if any of the locks have
71 * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
72 * will be added to the map once free() or extend() is invoked, provided there isn't
73 * already an entry. In addition, it initially has the host and UUID of the feature
74 * instance that created it. However, as soon as doExtend() completes successfully, the
75 * host and UUID of the lock will be updated to reflect the values within this feature
79 public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock>
80 implements PolicyEngineFeatureApi {
82 private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
84 private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
86 @Getter(AccessLevel.PROTECTED)
87 @Setter(AccessLevel.PROTECTED)
88 private static DistributedLockManager latestInstance = null;
92 * Name of the host on which this JVM is running.
95 private final String pdpName;
98 * UUID of this object.
101 private final String uuidString = UUID.randomUUID().toString();
104 * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
105 * lock is added to the map, it remains in the map until the lock is lost or until the
106 * unlock request completes.
108 private final Map<String, DistributedLock> resource2lock;
111 * Thread pool used to check for lock expiration and to notify owners when locks are
114 private ScheduledExecutorService exsvc = null;
117 * Used to cancel the expiration checker on shutdown.
119 private ScheduledFuture<?> checker = null;
122 * Feature properties.
124 private DistributedLockProperties featProps;
127 * Data source used to connect to the DB.
129 private BasicDataSource dataSource = null;
133 * Constructs the object.
135 public DistributedLockManager() {
136 this.pdpName = PolicyEngineConstants.PDP_NAME;
137 this.resource2lock = getResource2lock();
141 public int getSequenceNumber() {
146 public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
149 this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
150 this.dataSource = makeDataSource();
154 } catch (Exception e) {
155 throw new DistributedLockManagerException(e);
160 public boolean afterStart(PolicyEngine engine) {
163 exsvc = PolicyEngineConstants.getManager().getExecutorService();
164 exsvc.execute(this::deleteExpiredDbLocks);
165 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
167 setLatestInstance(this);
169 } catch (Exception e) {
170 throw new DistributedLockManagerException(e);
179 * @return a new, pooled data source
180 * @throws Exception exception
182 protected BasicDataSource makeDataSource() throws Exception {
183 var props = new Properties();
184 props.put("driverClassName", featProps.getDbDriver());
185 props.put("url", featProps.getDbUrl());
186 props.put("username", featProps.getDbUser());
187 props.put("password", featProps.getDbPwd());
188 props.put("testOnBorrow", "true");
189 props.put("poolPreparedStatements", "true");
191 // additional properties are listed in the GenericObjectPool API
193 return BasicDataSourceFactory.createDataSource(props);
197 * Deletes expired locks from the DB.
199 private void deleteExpiredDbLocks() {
200 logger.info("deleting all expired locks from the DB");
202 try (var conn = dataSource.getConnection();
203 var stmt = conn.prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
205 int ndel = stmt.executeUpdate();
206 logger.info("deleted {} expired locks from the DB", ndel);
208 } catch (SQLException e) {
209 logger.warn("failed to delete expired locks from the DB", e);
214 * Closes the data source. Does <i>not</i> invoke any lock call-backs.
217 public boolean afterStop(PolicyEngine engine) {
220 if (checker != null) {
221 checker.cancel(true);
229 * Closes {@link #dataSource} and sets it to {@code null}.
231 private void closeDataSource() {
233 if (dataSource != null) {
237 } catch (SQLException e) {
238 logger.error("cannot close the distributed locking DB", e);
245 protected boolean hasInstanceChanged() {
246 return (getLatestInstance() != this);
250 protected void finishLock(DistributedLock lock) {
251 lock.scheduleRequest(lock::doLock);
255 * Checks for expired locks.
257 private void checkExpired() {
259 logger.info("checking for expired locks");
260 Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
261 identifyDbLocks(expiredIds);
262 expireLocks(expiredIds);
264 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
266 } catch (RejectedExecutionException e) {
267 logger.warn("thread pool is no longer accepting requests", e);
269 } catch (SQLException | RuntimeException e) {
270 logger.error("error checking expired locks", e);
273 checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
277 logger.info("done checking for expired locks");
281 * Identifies this feature instance's locks that the DB indicates are still active.
283 * @param expiredIds IDs of resources that have expired locks. If a resource is still
284 * locked, it's ID is removed from this set
285 * @throws SQLException if a DB error occurs
287 private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
289 * We could query for host and UUIDs that actually appear within the locks, but
290 * those might change while the query is running so no real value in doing that.
291 * On the other hand, there's only a brief instance between the time a
292 * deserialized lock is added to this feature instance and its doExtend() method
293 * updates its host and UUID to match this feature instance. If this happens to
294 * run during that brief instance, then the lock will be lost and the callback
295 * invoked. It isn't worth complicating this code further to handle those highly
300 try (var conn = dataSource.getConnection();
301 PreparedStatement stmt = conn.prepareStatement(
302 "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
305 stmt.setString(1, pdpName);
306 stmt.setString(2, uuidString);
308 try (var resultSet = stmt.executeQuery()) {
309 while (resultSet.next()) {
310 var resourceId = resultSet.getString(1);
312 // we have now seen this resource id
313 expiredIds.remove(resourceId);
320 * Expires locks for the resources that no longer appear within the DB.
322 * @param expiredIds IDs of resources that have expired locks
324 private void expireLocks(Set<String> expiredIds) {
325 for (String resourceId : expiredIds) {
326 AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
328 resource2lock.computeIfPresent(resourceId, (key, lock) -> {
329 if (lock.isActive()) {
330 // it thinks it's active, but it isn't - remove from the map
338 DistributedLock lock = lockref.get();
340 logger.debug("removed lock from map {}", lock);
341 lock.deny(FeatureLockImpl.LOCK_LOST_MSG);
347 * Distributed Lock implementation.
349 public static class DistributedLock extends FeatureLockImpl {
350 private static final String SQL_FAILED_MSG = "request failed for lock: {}";
352 private static final long serialVersionUID = 1L;
355 * Feature containing this lock. May be {@code null} until the feature is
356 * identified. Note: this can only be null if the lock has been de-serialized.
358 private transient DistributedLockManager feature;
361 * Host name from the feature instance that created this object. Replaced with the
362 * host name from the current feature instance whenever the lock is successfully
365 private String hostName;
368 * UUID string from the feature instance that created this object. Replaced with
369 * the UUID string from the current feature instance whenever the lock is
370 * successfully extended.
372 private String uuidString;
375 * {@code True} if the lock is busy making a request, {@code false} otherwise.
377 private transient boolean busy = false;
380 * Request to be performed.
382 private transient RunnableWithEx request = null;
385 * Number of times we've retried a request.
387 private transient int nretries = 0;
390 * Constructs the object.
392 public DistributedLock() {
395 this.uuidString = "";
399 * Constructs the object.
401 * @param state initial state of the lock
402 * @param resourceId identifier of the resource to be locked
403 * @param ownerKey information identifying the owner requesting the lock
404 * @param holdSec amount of time, in seconds, for which the lock should be held,
405 * after which it will automatically be released
406 * @param callback callback to be invoked once the lock is granted, or
407 * subsequently lost; must not be {@code null}
408 * @param feature feature containing this lock
410 public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
411 DistributedLockManager feature) {
412 super(state, resourceId, ownerKey, holdSec, callback);
414 this.feature = feature;
415 this.hostName = feature.pdpName;
416 this.uuidString = feature.uuidString;
420 public boolean free() {
421 if (!freeAllowed()) {
425 var result = new AtomicBoolean(false);
427 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
428 if (curlock == this && !isUnavailable()) {
429 // this lock was the owner
431 setState(LockState.UNAVAILABLE);
434 * NOTE: do NOT return null; curlock must remain until doUnlock
443 scheduleRequest(this::doUnlock);
451 public void extend(int holdSec, LockCallback callback) {
452 if (!extendAllowed(holdSec, callback)) {
456 var success = new AtomicBoolean(false);
458 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
459 if (curlock == this && !isUnavailable()) {
461 setState(LockState.WAITING);
464 // note: leave it in the map until doUnlock() removes it
470 scheduleRequest(this::doExtend);
473 deny(NOT_LOCKED_MSG);
478 protected boolean addToFeature() {
479 feature = getLatestInstance();
480 if (feature == null) {
481 logger.warn("no feature yet for {}", this);
485 // put this lock into the map
486 feature.resource2lock.putIfAbsent(getResourceId(), this);
492 * Schedules a request for execution.
494 * @param schedreq the request that should be scheduled
496 private synchronized void scheduleRequest(RunnableWithEx schedreq) {
497 logger.debug("schedule lock action {}", this);
500 getThreadPool().execute(this::doRequest);
504 * Reschedules a request for execution, if there is not already a request in the
505 * queue, and if the retry count has not been exhausted.
507 * @param req request to be rescheduled
509 private void rescheduleRequest(RunnableWithEx req) {
510 synchronized (this) {
511 if (request != null) {
512 // a new request has already been scheduled - it supersedes "req"
513 logger.debug("not rescheduling lock action {}", this);
517 if (nretries++ < feature.featProps.getMaxRetries()) {
518 logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
520 getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
525 logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
530 * Gets, and removes, the next request from the queue. Clears {@link #busy} if
531 * there are no more requests in the queue.
533 * @param prevReq the previous request that was just run
535 * @return the next request, or {@code null} if the queue is empty
537 private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
538 if (request == null || request == prevReq) {
539 logger.debug("no more requests for {}", this);
544 RunnableWithEx req = request;
551 * Executes the current request, if none are currently executing.
553 private void doRequest() {
554 synchronized (this) {
556 // another thread is already processing the request(s)
563 * There is a race condition wherein this thread could invoke run() while the
564 * next scheduled thread checks the busy flag and finds that work is being
565 * done and returns, leaving the next work item in "request". In that case,
566 * the next work item may never be executed, thus we use a loop here, instead
567 * of just executing a single request.
569 RunnableWithEx req = null;
570 while ((req = getNextRequest(req)) != null) {
571 if (feature.resource2lock.get(getResourceId()) != this) {
573 * no longer in the map - don't apply the action, as it may interfere
574 * with any newly added Lock object
576 logger.debug("discard lock action {}", this);
577 synchronized (this) {
585 * Run the request. If it throws an exception, then it will be
586 * rescheduled for execution a little later.
590 } catch (SQLException e) {
591 logger.warn(SQL_FAILED_MSG, this, e);
593 if (e.getCause() instanceof SQLTransientException) {
594 // retry the request a little later
595 rescheduleRequest(req);
600 } catch (RuntimeException e) {
601 logger.warn(SQL_FAILED_MSG, this, e);
608 * Attempts to add a lock to the DB. Generates a callback, indicating success or
611 * @throws SQLException if a DB error occurs
613 private void doLock() throws SQLException {
615 logger.debug("discard doLock {}", this);
620 * There is a small window in which a client could invoke free() before the DB
621 * is updated. In that case, doUnlock will be added to the queue to run after
622 * this, which will delete the record, as desired. In addition, grant() will
623 * not do anything, because the lock state will have been set to UNAVAILABLE
627 logger.debug("doLock {}", this);
628 try (var conn = feature.dataSource.getConnection()) {
631 success = doDbInsert(conn);
633 } catch (SQLException e) {
634 logger.info("failed to insert lock record - attempting update: {}", this, e);
635 success = doDbUpdate(conn);
648 * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
649 * it fails, as this should only be executed in response to a call to
652 * @throws SQLException if a DB error occurs
654 private void doUnlock() throws SQLException {
655 logger.debug("unlock {}", this);
656 try (var conn = feature.dataSource.getConnection()) {
664 * Attempts to extend a lock in the DB. Generates a callback, indicating success
667 * @throws SQLException if a DB error occurs
669 private void doExtend() throws SQLException {
671 logger.debug("discard doExtend {}", this);
676 * There is a small window in which a client could invoke free() before the DB
677 * is updated. In that case, doUnlock will be added to the queue to run after
678 * this, which will delete the record, as desired. In addition, grant() will
679 * not do anything, because the lock state will have been set to UNAVAILABLE
683 logger.debug("doExtend {}", this);
684 try (var conn = feature.dataSource.getConnection()) {
686 * invoker may have called extend() before free() had a chance to insert
687 * the record, thus we have to try to insert, if the update fails
689 if (doDbUpdate(conn) || doDbInsert(conn)) {
699 * Inserts the lock into the DB.
701 * @param conn DB connection
702 * @return {@code true} if a record was successfully inserted, {@code false}
704 * @throws SQLException if a DB error occurs
706 protected boolean doDbInsert(Connection conn) throws SQLException {
707 logger.debug("insert lock record {}", this);
708 try (var stmt = conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
709 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
711 stmt.setString(1, getResourceId());
712 stmt.setString(2, feature.pdpName);
713 stmt.setString(3, feature.uuidString);
714 stmt.setInt(4, getHoldSec());
716 stmt.executeUpdate();
718 this.hostName = feature.pdpName;
719 this.uuidString = feature.uuidString;
726 * Updates the lock in the DB.
728 * @param conn DB connection
729 * @return {@code true} if a record was successfully updated, {@code false}
731 * @throws SQLException if a DB error occurs
733 protected boolean doDbUpdate(Connection conn) throws SQLException {
734 logger.debug("update lock record {}", this);
735 try (var stmt = conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
736 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
737 + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
739 stmt.setString(1, getResourceId());
740 stmt.setString(2, feature.pdpName);
741 stmt.setString(3, feature.uuidString);
742 stmt.setInt(4, getHoldSec());
744 stmt.setString(5, getResourceId());
745 stmt.setString(6, this.hostName);
746 stmt.setString(7, this.uuidString);
748 if (stmt.executeUpdate() != 1) {
752 this.hostName = feature.pdpName;
753 this.uuidString = feature.uuidString;
760 * Deletes the lock from the DB.
762 * @param conn DB connection
763 * @throws SQLException if a DB error occurs
765 protected void doDbDelete(Connection conn) throws SQLException {
766 logger.debug("delete lock record {}", this);
768 .prepareStatement("DELETE FROM pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
770 stmt.setString(1, getResourceId());
771 stmt.setString(2, this.hostName);
772 stmt.setString(3, this.uuidString);
774 stmt.executeUpdate();
779 * Removes the lock from the map, and sends a notification using the current
782 private void removeFromMap() {
783 logger.debug("remove lock from map {}", this);
784 feature.resource2lock.remove(getResourceId(), this);
786 synchronized (this) {
787 if (!isUnavailable()) {
794 public String toString() {
795 return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
796 + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
802 private static interface RunnableWithEx {
803 void run() throws SQLException;
806 // these may be overridden by junit tests
808 protected Properties getProperties(String fileName) {
809 return SystemPersistenceConstants.getManager().getProperties(fileName);
812 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
813 LockCallback callback) {
814 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);