2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.ResultSet;
26 import java.sql.SQLException;
27 import java.sql.SQLTransientException;
28 import java.util.HashSet;
30 import java.util.Properties;
32 import java.util.UUID;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.ScheduledFuture;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import lombok.AccessLevel;
42 import org.apache.commons.dbcp2.BasicDataSource;
43 import org.apache.commons.dbcp2.BasicDataSourceFactory;
44 import org.onap.policy.common.utils.network.NetworkUtil;
45 import org.onap.policy.drools.core.lock.LockCallback;
46 import org.onap.policy.drools.core.lock.LockState;
47 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
48 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
49 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
50 import org.onap.policy.drools.system.PolicyEngine;
51 import org.onap.policy.drools.system.PolicyEngineConstants;
52 import org.onap.policy.drools.system.internal.FeatureLockImpl;
53 import org.onap.policy.drools.system.internal.LockManager;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
59 * Distributed implementation of the Lock Feature. Maintains locks across servers using a
63 * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
69 * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
70 * instead populated with the {@link #uuidString}.</li>
71 * <li>A periodic check of the DB is made to determine if any of the locks have
73 * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
74 * will be added to the map once free() or extend() is invoked, provided there isn't
75 * already an entry. In addition, it initially has the host and UUID of the feature
76 * instance that created it. However, as soon as doExtend() completes successfully, the
77 * host and UUID of the lock will be updated to reflect the values within this feature
81 public class DistributedLockManager extends LockManager<DistributedLockManager.DistributedLock>
82 implements PolicyEngineFeatureApi {
84 private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
86 private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
88 @Getter(AccessLevel.PROTECTED)
89 @Setter(AccessLevel.PROTECTED)
90 private static DistributedLockManager latestInstance = null;
94 * Name of the host on which this JVM is running.
97 private final String hostName;
100 * UUID of this object.
103 private final String uuidString = UUID.randomUUID().toString();
106 * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
107 * lock is added to the map, it remains in the map until the lock is lost or until the
108 * unlock request completes.
110 private final Map<String, DistributedLock> resource2lock;
113 * Thread pool used to check for lock expiration and to notify owners when locks are
116 private ScheduledExecutorService exsvc = null;
119 * Used to cancel the expiration checker on shutdown.
121 private ScheduledFuture<?> checker = null;
124 * Feature properties.
126 private DistributedLockProperties featProps;
129 * Data source used to connect to the DB.
131 private BasicDataSource dataSource = null;
135 * Constructs the object.
137 public DistributedLockManager() {
138 this.hostName = NetworkUtil.getHostname();
139 this.resource2lock = getResource2lock();
143 public int getSequenceNumber() {
148 public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
151 this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
152 this.dataSource = makeDataSource();
156 } catch (Exception e) {
157 throw new DistributedLockManagerException(e);
162 public boolean afterStart(PolicyEngine engine) {
165 exsvc = PolicyEngineConstants.getManager().getExecutorService();
166 exsvc.execute(this::deleteExpiredDbLocks);
167 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
169 setLatestInstance(this);
171 } catch (Exception e) {
172 throw new DistributedLockManagerException(e);
181 * @return a new, pooled data source
182 * @throws Exception exception
184 protected BasicDataSource makeDataSource() throws Exception {
185 Properties props = new Properties();
186 props.put("driverClassName", featProps.getDbDriver());
187 props.put("url", featProps.getDbUrl());
188 props.put("username", featProps.getDbUser());
189 props.put("password", featProps.getDbPwd());
190 props.put("testOnBorrow", "true");
191 props.put("poolPreparedStatements", "true");
193 // additional properties are listed in the GenericObjectPool API
195 return BasicDataSourceFactory.createDataSource(props);
199 * Deletes expired locks from the DB.
201 private void deleteExpiredDbLocks() {
202 logger.info("deleting all expired locks from the DB");
204 try (Connection conn = dataSource.getConnection();
205 PreparedStatement stmt = conn
206 .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
208 int ndel = stmt.executeUpdate();
209 logger.info("deleted {} expired locks from the DB", ndel);
211 } catch (SQLException e) {
212 logger.warn("failed to delete expired locks from the DB", e);
217 * Closes the data source. Does <i>not</i> invoke any lock call-backs.
220 public boolean afterStop(PolicyEngine engine) {
223 if (checker != null) {
224 checker.cancel(true);
232 * Closes {@link #dataSource} and sets it to {@code null}.
234 private void closeDataSource() {
236 if (dataSource != null) {
240 } catch (SQLException e) {
241 logger.error("cannot close the distributed locking DB", e);
248 protected boolean hasInstanceChanged() {
249 return (getLatestInstance() != this);
253 protected void finishLock(DistributedLock lock) {
254 lock.scheduleRequest(lock::doLock);
258 * Checks for expired locks.
260 private void checkExpired() {
262 logger.info("checking for expired locks");
263 Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
264 identifyDbLocks(expiredIds);
265 expireLocks(expiredIds);
267 checker = exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
269 } catch (RejectedExecutionException e) {
270 logger.warn("thread pool is no longer accepting requests", e);
272 } catch (SQLException | RuntimeException e) {
273 logger.error("error checking expired locks", e);
276 checker = exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
280 logger.info("done checking for expired locks");
284 * Identifies this feature instance's locks that the DB indicates are still active.
286 * @param expiredIds IDs of resources that have expired locks. If a resource is still
287 * locked, it's ID is removed from this set
288 * @throws SQLException if a DB error occurs
290 private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
292 * We could query for host and UUIDs that actually appear within the locks, but
293 * those might change while the query is running so no real value in doing that.
294 * On the other hand, there's only a brief instance between the time a
295 * deserialized lock is added to this feature instance and its doExtend() method
296 * updates its host and UUID to match this feature instance. If this happens to
297 * run during that brief instance, then the lock will be lost and the callback
298 * invoked. It isn't worth complicating this code further to handle those highly
303 try (Connection conn = dataSource.getConnection();
304 PreparedStatement stmt = conn.prepareStatement(
305 "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
308 stmt.setString(1, hostName);
309 stmt.setString(2, uuidString);
311 try (ResultSet resultSet = stmt.executeQuery()) {
312 while (resultSet.next()) {
313 String resourceId = resultSet.getString(1);
315 // we have now seen this resource id
316 expiredIds.remove(resourceId);
323 * Expires locks for the resources that no longer appear within the DB.
325 * @param expiredIds IDs of resources that have expired locks
327 private void expireLocks(Set<String> expiredIds) {
328 for (String resourceId : expiredIds) {
329 AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
331 resource2lock.computeIfPresent(resourceId, (key, lock) -> {
332 if (lock.isActive()) {
333 // it thinks it's active, but it isn't - remove from the map
341 DistributedLock lock = lockref.get();
343 logger.debug("removed lock from map {}", lock);
344 lock.deny(DistributedLock.LOCK_LOST_MSG, false);
350 * Distributed Lock implementation.
352 public static class DistributedLock extends FeatureLockImpl {
353 private static final String SQL_FAILED_MSG = "request failed for lock: {}";
355 private static final long serialVersionUID = 1L;
358 * Feature containing this lock. May be {@code null} until the feature is
359 * identified. Note: this can only be null if the lock has been de-serialized.
361 private transient DistributedLockManager feature;
364 * Host name from the feature instance that created this object. Replaced with the
365 * host name from the current feature instance whenever the lock is successfully
368 private String hostName;
371 * UUID string from the feature instance that created this object. Replaced with
372 * the UUID string from the current feature instance whenever the lock is
373 * successfully extended.
375 private String uuidString;
378 * {@code True} if the lock is busy making a request, {@code false} otherwise.
380 private transient boolean busy = false;
383 * Request to be performed.
385 private transient RunnableWithEx request = null;
388 * Number of times we've retried a request.
390 private transient int nretries = 0;
393 * Constructs the object.
395 public DistributedLock() {
398 this.uuidString = "";
402 * Constructs the object.
404 * @param state initial state of the lock
405 * @param resourceId identifier of the resource to be locked
406 * @param ownerKey information identifying the owner requesting the lock
407 * @param holdSec amount of time, in seconds, for which the lock should be held,
408 * after which it will automatically be released
409 * @param callback callback to be invoked once the lock is granted, or
410 * subsequently lost; must not be {@code null}
411 * @param feature feature containing this lock
413 public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
414 DistributedLockManager feature) {
415 super(state, resourceId, ownerKey, holdSec, callback);
417 this.feature = feature;
418 this.hostName = feature.hostName;
419 this.uuidString = feature.uuidString;
423 public boolean free() {
424 if (!freeAllowed()) {
428 AtomicBoolean result = new AtomicBoolean(false);
430 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
431 if (curlock == this && !isUnavailable()) {
432 // this lock was the owner
434 setState(LockState.UNAVAILABLE);
437 * NOTE: do NOT return null; curlock must remain until doUnlock
446 scheduleRequest(this::doUnlock);
454 public void extend(int holdSec, LockCallback callback) {
455 if (!extendAllowed(holdSec, callback)) {
459 AtomicBoolean success = new AtomicBoolean(false);
461 feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
462 if (curlock == this && !isUnavailable()) {
464 setState(LockState.WAITING);
467 // note: leave it in the map until doUnlock() removes it
473 scheduleRequest(this::doExtend);
476 deny(NOT_LOCKED_MSG, true);
481 protected boolean addToFeature() {
482 feature = getLatestInstance();
483 if (feature == null) {
484 logger.warn("no feature yet for {}", this);
488 // put this lock into the map
489 feature.resource2lock.putIfAbsent(getResourceId(), this);
495 * Schedules a request for execution.
497 * @param schedreq the request that should be scheduled
499 private synchronized void scheduleRequest(RunnableWithEx schedreq) {
500 logger.debug("schedule lock action {}", this);
503 getThreadPool().execute(this::doRequest);
507 * Reschedules a request for execution, if there is not already a request in the
508 * queue, and if the retry count has not been exhausted.
510 * @param req request to be rescheduled
512 private void rescheduleRequest(RunnableWithEx req) {
513 synchronized (this) {
514 if (request != null) {
515 // a new request has already been scheduled - it supersedes "req"
516 logger.debug("not rescheduling lock action {}", this);
520 if (nretries++ < feature.featProps.getMaxRetries()) {
521 logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
523 getThreadPool().schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
528 logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
533 * Gets, and removes, the next request from the queue. Clears {@link #busy} if
534 * there are no more requests in the queue.
536 * @param prevReq the previous request that was just run
538 * @return the next request, or {@code null} if the queue is empty
540 private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
541 if (request == null || request == prevReq) {
542 logger.debug("no more requests for {}", this);
547 RunnableWithEx req = request;
554 * Executes the current request, if none are currently executing.
556 private void doRequest() {
557 synchronized (this) {
559 // another thread is already processing the request(s)
566 * There is a race condition wherein this thread could invoke run() while the
567 * next scheduled thread checks the busy flag and finds that work is being
568 * done and returns, leaving the next work item in "request". In that case,
569 * the next work item may never be executed, thus we use a loop here, instead
570 * of just executing a single request.
572 RunnableWithEx req = null;
573 while ((req = getNextRequest(req)) != null) {
574 if (feature.resource2lock.get(getResourceId()) != this) {
576 * no longer in the map - don't apply the action, as it may interfere
577 * with any newly added Lock object
579 logger.debug("discard lock action {}", this);
580 synchronized (this) {
588 * Run the request. If it throws an exception, then it will be
589 * rescheduled for execution a little later.
593 } catch (SQLException e) {
594 logger.warn(SQL_FAILED_MSG, this, e);
596 if (e.getCause() instanceof SQLTransientException) {
597 // retry the request a little later
598 rescheduleRequest(req);
603 } catch (RuntimeException e) {
604 logger.warn(SQL_FAILED_MSG, this, e);
611 * Attempts to add a lock to the DB. Generates a callback, indicating success or
614 * @throws SQLException if a DB error occurs
616 private void doLock() throws SQLException {
618 logger.debug("discard doLock {}", this);
623 * There is a small window in which a client could invoke free() before the DB
624 * is updated. In that case, doUnlock will be added to the queue to run after
625 * this, which will delete the record, as desired. In addition, grant() will
626 * not do anything, because the lock state will have been set to UNAVAILABLE
630 logger.debug("doLock {}", this);
631 try (Connection conn = feature.dataSource.getConnection()) {
632 boolean success = false;
634 success = doDbInsert(conn);
636 } catch (SQLException e) {
637 logger.info("failed to insert lock record - attempting update: {}", this, e);
638 success = doDbUpdate(conn);
651 * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
652 * it fails, as this should only be executed in response to a call to
655 * @throws SQLException if a DB error occurs
657 private void doUnlock() throws SQLException {
658 logger.debug("unlock {}", this);
659 try (Connection conn = feature.dataSource.getConnection()) {
667 * Attempts to extend a lock in the DB. Generates a callback, indicating success
670 * @throws SQLException if a DB error occurs
672 private void doExtend() throws SQLException {
674 logger.debug("discard doExtend {}", this);
679 * There is a small window in which a client could invoke free() before the DB
680 * is updated. In that case, doUnlock will be added to the queue to run after
681 * this, which will delete the record, as desired. In addition, grant() will
682 * not do anything, because the lock state will have been set to UNAVAILABLE
686 logger.debug("doExtend {}", this);
687 try (Connection conn = feature.dataSource.getConnection()) {
689 * invoker may have called extend() before free() had a chance to insert
690 * the record, thus we have to try to insert, if the update fails
692 if (doDbUpdate(conn) || doDbInsert(conn)) {
702 * Inserts the lock into the DB.
704 * @param conn DB connection
705 * @return {@code true} if a record was successfully inserted, {@code false}
707 * @throws SQLException if a DB error occurs
709 protected boolean doDbInsert(Connection conn) throws SQLException {
710 logger.debug("insert lock record {}", this);
711 try (PreparedStatement stmt =
712 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
713 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
715 stmt.setString(1, getResourceId());
716 stmt.setString(2, feature.hostName);
717 stmt.setString(3, feature.uuidString);
718 stmt.setInt(4, getHoldSec());
720 stmt.executeUpdate();
722 this.hostName = feature.hostName;
723 this.uuidString = feature.uuidString;
730 * Updates the lock in the DB.
732 * @param conn DB connection
733 * @return {@code true} if a record was successfully updated, {@code false}
735 * @throws SQLException if a DB error occurs
737 protected boolean doDbUpdate(Connection conn) throws SQLException {
738 logger.debug("update lock record {}", this);
739 try (PreparedStatement stmt =
740 conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
741 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
742 + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
744 stmt.setString(1, getResourceId());
745 stmt.setString(2, feature.hostName);
746 stmt.setString(3, feature.uuidString);
747 stmt.setInt(4, getHoldSec());
749 stmt.setString(5, getResourceId());
750 stmt.setString(6, this.hostName);
751 stmt.setString(7, this.uuidString);
753 if (stmt.executeUpdate() != 1) {
757 this.hostName = feature.hostName;
758 this.uuidString = feature.uuidString;
765 * Deletes the lock from the DB.
767 * @param conn DB connection
768 * @throws SQLException if a DB error occurs
770 protected void doDbDelete(Connection conn) throws SQLException {
771 logger.debug("delete lock record {}", this);
772 try (PreparedStatement stmt = conn
773 .prepareStatement("DELETE FROM pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
775 stmt.setString(1, getResourceId());
776 stmt.setString(2, this.hostName);
777 stmt.setString(3, this.uuidString);
779 stmt.executeUpdate();
784 * Removes the lock from the map, and sends a notification using the current
787 private void removeFromMap() {
788 logger.debug("remove lock from map {}", this);
789 feature.resource2lock.remove(getResourceId(), this);
791 synchronized (this) {
792 if (!isUnavailable()) {
793 deny(LOCK_LOST_MSG, true);
799 public String toString() {
800 return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
801 + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
807 private static interface RunnableWithEx {
808 void run() throws SQLException;
811 // these may be overridden by junit tests
813 protected Properties getProperties(String fileName) {
814 return SystemPersistenceConstants.getManager().getProperties(fileName);
817 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
818 LockCallback callback) {
819 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);