7ee786b9b01e1c1e3699499bd9ba5197f95a4195
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.ResultSet;
26 import java.sql.SQLException;
27 import java.sql.SQLTransientException;
28 import java.util.HashSet;
29 import java.util.Map;
30 import java.util.Properties;
31 import java.util.Set;
32 import java.util.UUID;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import lombok.AccessLevel;
40 import lombok.Getter;
41 import lombok.Setter;
42 import org.apache.commons.dbcp2.BasicDataSource;
43 import org.apache.commons.dbcp2.BasicDataSourceFactory;
44 import org.onap.policy.common.utils.network.NetworkUtil;
45 import org.onap.policy.drools.core.lock.AlwaysFailLock;
46 import org.onap.policy.drools.core.lock.Lock;
47 import org.onap.policy.drools.core.lock.LockCallback;
48 import org.onap.policy.drools.core.lock.LockImpl;
49 import org.onap.policy.drools.core.lock.LockState;
50 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
51 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
52 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
53 import org.onap.policy.drools.system.PolicyEngine;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58 /**
59  * Distributed implementation of the Lock Feature. Maintains locks across servers using a
60  * shared DB.
61  *
62  * <p/>
63  * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
64  * parameter.
65  *
66  * <p/>
67  * Additional Notes:
68  * <dl>
69  * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
70  * instead populated with the {@link #uuidString}.</li>
71  * <li>A periodic check of the DB is made to determine if any of the locks have
72  * expired.</li>
73  * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
74  * will be added to the map once free() or extend() is invoked, provided there isn't
75  * already an entry. In addition, it initially has the host and UUID of the feature
76  * instance that created it. However, as soon as doExtend() completes successfully, the
77  * host and UUID of the lock will be updated to reflect the values within this feature
78  * instance.</li>
79  * </dl>
80  */
81 public class DistributedLockManager implements PolicyResourceLockManager, PolicyEngineFeatureApi {
82
83     private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
84
85     private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
86     private static final String LOCK_LOST_MSG = "lock lost";
87     private static final String NOT_LOCKED_MSG = "not locked";
88
89     @Getter(AccessLevel.PROTECTED)
90     @Setter(AccessLevel.PROTECTED)
91     private static DistributedLockManager latestInstance = null;
92
93
94     /**
95      * Name of the host on which this JVM is running.
96      */
97     @Getter
98     private final String hostName;
99
100     /**
101      * UUID of this object.
102      */
103     @Getter
104     private final String uuidString = UUID.randomUUID().toString();
105
106     /**
107      * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
108      * lock is added to the map, it remains in the map until the lock is lost or until the
109      * unlock request completes.
110      */
111     private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
112
113     /**
114      * Engine with which this manager is associated.
115      */
116     private PolicyEngine engine;
117
118     /**
119      * Feature properties.
120      */
121     private DistributedLockProperties featProps;
122
123     /**
124      * Thread pool used to check for lock expiration and to notify owners when locks are
125      * granted or lost.
126      */
127     private ScheduledExecutorService exsvc = null;
128
129     /**
130      * Data source used to connect to the DB.
131      */
132     private BasicDataSource dataSource = null;
133
134
135     /**
136      * Constructs the object.
137      */
138     public DistributedLockManager() {
139         this.hostName = NetworkUtil.getHostname();
140     }
141
142     @Override
143     public int getSequenceNumber() {
144         return 1000;
145     }
146
147     @Override
148     public boolean isAlive() {
149         return (exsvc != null);
150     }
151
152     @Override
153     public boolean start() {
154         // handled via engine API
155         return true;
156     }
157
158     @Override
159     public boolean stop() {
160         // handled via engine API
161         return true;
162     }
163
164     @Override
165     public void shutdown() {
166         // handled via engine API
167     }
168
169     @Override
170     public boolean isLocked() {
171         return false;
172     }
173
174     @Override
175     public boolean lock() {
176         return true;
177     }
178
179     @Override
180     public boolean unlock() {
181         return true;
182     }
183
184     @Override
185     public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
186
187         try {
188             this.engine = engine;
189             this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
190             this.exsvc = getThreadPool();
191             this.dataSource = makeDataSource();
192
193             return this;
194
195         } catch (Exception e) {
196             throw new DistributedLockManagerException(e);
197         }
198     }
199
200     @Override
201     public boolean afterStart(PolicyEngine engine) {
202
203         try {
204             exsvc.execute(this::deleteExpiredDbLocks);
205             exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
206
207             setLatestInstance(this);
208
209         } catch (Exception e) {
210             throw new DistributedLockManagerException(e);
211         }
212
213         return false;
214     }
215
216     /**
217      * Make data source.
218      *
219      * @return a new, pooled data source
220      * @throws Exception exception
221      */
222     protected BasicDataSource makeDataSource() throws Exception {
223         Properties props = new Properties();
224         props.put("driverClassName", featProps.getDbDriver());
225         props.put("url", featProps.getDbUrl());
226         props.put("username", featProps.getDbUser());
227         props.put("password", featProps.getDbPwd());
228         props.put("testOnBorrow", "true");
229         props.put("poolPreparedStatements", "true");
230
231         // additional properties are listed in the GenericObjectPool API
232
233         return BasicDataSourceFactory.createDataSource(props);
234     }
235
236     /**
237      * Deletes expired locks from the DB.
238      */
239     private void deleteExpiredDbLocks() {
240         logger.info("deleting all expired locks from the DB");
241
242         try (Connection conn = dataSource.getConnection();
243                         PreparedStatement stmt = conn
244                                         .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
245
246             int ndel = stmt.executeUpdate();
247             logger.info("deleted {} expired locks from the DB", ndel);
248
249         } catch (SQLException e) {
250             logger.warn("failed to delete expired locks from the DB", e);
251         }
252     }
253
254     /**
255      * Closes the data source. Does <i>not</i> invoke any lock call-backs.
256      */
257     @Override
258     public boolean afterStop(PolicyEngine engine) {
259         exsvc = null;
260         closeDataSource();
261         return false;
262     }
263
264     /**
265      * Closes {@link #dataSource} and sets it to {@code null}.
266      */
267     private void closeDataSource() {
268         try {
269             if (dataSource != null) {
270                 dataSource.close();
271             }
272
273         } catch (SQLException e) {
274             logger.error("cannot close the distributed locking DB", e);
275         }
276
277         dataSource = null;
278     }
279
280     @Override
281     public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
282                     boolean waitForLock) {
283
284         if (latestInstance != this) {
285             AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
286             lock.notifyUnavailable();
287             return lock;
288         }
289
290         DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
291
292         DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
293
294         // do these outside of compute() to avoid blocking other map operations
295         if (existingLock == null) {
296             logger.debug("added lock to map {}", lock);
297             lock.scheduleRequest(lock::doLock);
298         } else {
299             lock.deny("resource is busy", true);
300         }
301
302         return lock;
303     }
304
305     /**
306      * Checks for expired locks.
307      */
308     private void checkExpired() {
309
310         try {
311             logger.info("checking for expired locks");
312             Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
313             identifyDbLocks(expiredIds);
314             expireLocks(expiredIds);
315
316             exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
317
318         } catch (RejectedExecutionException e) {
319             logger.warn("thread pool is no longer accepting requests", e);
320
321         } catch (SQLException | RuntimeException e) {
322             logger.error("error checking expired locks", e);
323             exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
324         }
325
326         logger.info("done checking for expired locks");
327     }
328
329     /**
330      * Identifies this feature instance's locks that the DB indicates are still active.
331      *
332      * @param expiredIds IDs of resources that have expired locks. If a resource is still
333      *        locked, it's ID is removed from this set
334      * @throws SQLException if a DB error occurs
335      */
336     private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
337         /*
338          * We could query for host and UUIDs that actually appear within the locks, but
339          * those might change while the query is running so no real value in doing that.
340          * On the other hand, there's only a brief instance between the time a
341          * deserialized lock is added to this feature instance and its doExtend() method
342          * updates its host and UUID to match this feature instance. If this happens to
343          * run during that brief instance, then the lock will be lost and the callback
344          * invoked. It isn't worth complicating this code further to handle those highly
345          * unlikely cases.
346          */
347
348         // @formatter:off
349         try (Connection conn = dataSource.getConnection();
350                     PreparedStatement stmt = conn.prepareStatement(
351                         "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
352             // @formatter:on
353
354             stmt.setString(1, hostName);
355             stmt.setString(2, uuidString);
356
357             try (ResultSet resultSet = stmt.executeQuery()) {
358                 while (resultSet.next()) {
359                     String resourceId = resultSet.getString(1);
360
361                     // we have now seen this resource id
362                     expiredIds.remove(resourceId);
363                 }
364             }
365         }
366     }
367
368     /**
369      * Expires locks for the resources that no longer appear within the DB.
370      *
371      * @param expiredIds IDs of resources that have expired locks
372      */
373     private void expireLocks(Set<String> expiredIds) {
374         for (String resourceId : expiredIds) {
375             AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
376
377             resource2lock.computeIfPresent(resourceId, (key, lock) -> {
378                 if (lock.isActive()) {
379                     // it thinks it's active, but it isn't - remove from the map
380                     lockref.set(lock);
381                     return null;
382                 }
383
384                 return lock;
385             });
386
387             DistributedLock lock = lockref.get();
388             if (lock != null) {
389                 logger.debug("removed lock from map {}", lock);
390                 lock.deny(LOCK_LOST_MSG, false);
391             }
392         }
393     }
394
395     /**
396      * Distributed Lock implementation.
397      */
398     public static class DistributedLock extends LockImpl {
399         private static final String SQL_FAILED_MSG = "request failed for lock: {}";
400
401         private static final long serialVersionUID = 1L;
402
403         /**
404          * Feature containing this lock. May be {@code null} until the feature is
405          * identified. Note: this can only be null if the lock has been de-serialized.
406          */
407         private transient DistributedLockManager feature;
408
409         /**
410          * Host name from the feature instance that created this object. Replaced with the
411          * host name from the current feature instance whenever the lock is successfully
412          * extended.
413          */
414         private String hostName;
415
416         /**
417          * UUID string from the feature instance that created this object. Replaced with
418          * the UUID string from the current feature instance whenever the lock is
419          * successfully extended.
420          */
421         private String uuidString;
422
423         /**
424          * {@code True} if the lock is busy making a request, {@code false} otherwise.
425          */
426         private transient boolean busy = false;
427
428         /**
429          * Request to be performed.
430          */
431         private transient RunnableWithEx request = null;
432
433         /**
434          * Number of times we've retried a request.
435          */
436         private transient int nretries = 0;
437
438         /**
439          * Constructs the object.
440          */
441         public DistributedLock() {
442             this.hostName = "";
443             this.uuidString = "";
444         }
445
446         /**
447          * Constructs the object.
448          *
449          * @param state initial state of the lock
450          * @param resourceId identifier of the resource to be locked
451          * @param ownerKey information identifying the owner requesting the lock
452          * @param holdSec amount of time, in seconds, for which the lock should be held,
453          *        after which it will automatically be released
454          * @param callback callback to be invoked once the lock is granted, or
455          *        subsequently lost; must not be {@code null}
456          * @param feature feature containing this lock
457          */
458         public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
459                         DistributedLockManager feature) {
460             super(state, resourceId, ownerKey, holdSec, callback);
461
462             this.feature = feature;
463             this.hostName = feature.hostName;
464             this.uuidString = feature.uuidString;
465         }
466
467         /**
468          * Grants this lock. The notification is <i>always</i> invoked via the
469          * <i>foreground</i> thread.
470          */
471         protected void grant() {
472             synchronized (this) {
473                 if (isUnavailable()) {
474                     return;
475                 }
476
477                 setState(LockState.ACTIVE);
478             }
479
480             logger.info("lock granted: {}", this);
481
482             notifyAvailable();
483         }
484
485         /**
486          * Permanently denies this lock.
487          *
488          * @param reason the reason the lock was denied
489          * @param foreground {@code true} if the callback can be invoked in the current
490          *        (i.e., foreground) thread, {@code false} if it should be invoked via the
491          *        executor
492          */
493         protected void deny(String reason, boolean foreground) {
494             synchronized (this) {
495                 setState(LockState.UNAVAILABLE);
496             }
497
498             logger.info("{}: {}", reason, this);
499
500             if (feature == null || foreground) {
501                 notifyUnavailable();
502
503             } else {
504                 feature.exsvc.execute(this::notifyUnavailable);
505             }
506         }
507
508         @Override
509         public boolean free() {
510             // do a quick check of the state
511             if (isUnavailable()) {
512                 return false;
513             }
514
515             logger.info("releasing lock: {}", this);
516
517             if (!attachFeature()) {
518                 setState(LockState.UNAVAILABLE);
519                 return false;
520             }
521
522             AtomicBoolean result = new AtomicBoolean(false);
523
524             feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
525                 if (curlock == this && !isUnavailable()) {
526                     // this lock was the owner
527                     result.set(true);
528                     setState(LockState.UNAVAILABLE);
529
530                     /*
531                      * NOTE: do NOT return null; curlock must remain until doUnlock
532                      * completes.
533                      */
534                 }
535
536                 return curlock;
537             });
538
539             if (result.get()) {
540                 scheduleRequest(this::doUnlock);
541                 return true;
542             }
543
544             return false;
545         }
546
547         @Override
548         public void extend(int holdSec, LockCallback callback) {
549             if (holdSec < 0) {
550                 throw new IllegalArgumentException("holdSec is negative");
551             }
552
553             setHoldSec(holdSec);
554             setCallback(callback);
555
556             // do a quick check of the state
557             if (isUnavailable() || !attachFeature()) {
558                 deny(LOCK_LOST_MSG, true);
559                 return;
560             }
561
562             AtomicBoolean success = new AtomicBoolean(false);
563
564             feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
565                 if (curlock == this && !isUnavailable()) {
566                     success.set(true);
567                     setState(LockState.WAITING);
568                 }
569
570                 // note: leave it in the map until doUnlock() removes it
571
572                 return curlock;
573             });
574
575             if (success.get()) {
576                 scheduleRequest(this::doExtend);
577
578             } else {
579                 deny(NOT_LOCKED_MSG, true);
580             }
581         }
582
583         /**
584          * Attaches to the feature instance, if not already attached.
585          *
586          * @return {@code true} if the lock is now attached to a feature, {@code false}
587          *         otherwise
588          */
589         private synchronized boolean attachFeature() {
590             if (feature != null) {
591                 // already attached
592                 return true;
593             }
594
595             feature = latestInstance;
596             if (feature == null) {
597                 logger.warn("no feature yet for {}", this);
598                 return false;
599             }
600
601             // put this lock into the map
602             feature.resource2lock.putIfAbsent(getResourceId(), this);
603
604             return true;
605         }
606
607         /**
608          * Schedules a request for execution.
609          *
610          * @param schedreq the request that should be scheduled
611          */
612         private synchronized void scheduleRequest(RunnableWithEx schedreq) {
613             logger.debug("schedule lock action {}", this);
614             nretries = 0;
615             request = schedreq;
616             feature.exsvc.execute(this::doRequest);
617         }
618
619         /**
620          * Reschedules a request for execution, if there is not already a request in the
621          * queue, and if the retry count has not been exhausted.
622          *
623          * @param req request to be rescheduled
624          */
625         private void rescheduleRequest(RunnableWithEx req) {
626             synchronized (this) {
627                 if (request != null) {
628                     // a new request has already been scheduled - it supersedes "req"
629                     logger.debug("not rescheduling lock action {}", this);
630                     return;
631                 }
632
633                 if (nretries++ < feature.featProps.getMaxRetries()) {
634                     logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
635                     request = req;
636                     feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
637                     return;
638                 }
639             }
640
641             logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
642             removeFromMap();
643         }
644
645         /**
646          * Gets, and removes, the next request from the queue. Clears {@link #busy} if
647          * there are no more requests in the queue.
648          *
649          * @param prevReq the previous request that was just run
650          *
651          * @return the next request, or {@code null} if the queue is empty
652          */
653         private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
654             if (request == null || request == prevReq) {
655                 logger.debug("no more requests for {}", this);
656                 busy = false;
657                 return null;
658             }
659
660             RunnableWithEx req = request;
661             request = null;
662
663             return req;
664         }
665
666         /**
667          * Executes the current request, if none are currently executing.
668          */
669         private void doRequest() {
670             synchronized (this) {
671                 if (busy) {
672                     // another thread is already processing the request(s)
673                     return;
674                 }
675                 busy = true;
676             }
677
678             /*
679              * There is a race condition wherein this thread could invoke run() while the
680              * next scheduled thread checks the busy flag and finds that work is being
681              * done and returns, leaving the next work item in "request". In that case,
682              * the next work item may never be executed, thus we use a loop here, instead
683              * of just executing a single request.
684              */
685             RunnableWithEx req = null;
686             while ((req = getNextRequest(req)) != null) {
687                 if (feature.resource2lock.get(getResourceId()) != this) {
688                     /*
689                      * no longer in the map - don't apply the action, as it may interfere
690                      * with any newly added Lock object
691                      */
692                     logger.debug("discard lock action {}", this);
693                     synchronized (this) {
694                         busy = false;
695                     }
696                     return;
697                 }
698
699                 try {
700                     /*
701                      * Run the request. If it throws an exception, then it will be
702                      * rescheduled for execution a little later.
703                      */
704                     req.run();
705
706                 } catch (SQLException e) {
707                     logger.warn(SQL_FAILED_MSG, this, e);
708
709                     if (e.getCause() instanceof SQLTransientException) {
710                         // retry the request a little later
711                         rescheduleRequest(req);
712                     } else {
713                         removeFromMap();
714                     }
715
716                 } catch (RuntimeException e) {
717                     logger.warn(SQL_FAILED_MSG, this, e);
718                     removeFromMap();
719                 }
720             }
721         }
722
723         /**
724          * Attempts to add a lock to the DB. Generates a callback, indicating success or
725          * failure.
726          *
727          * @throws SQLException if a DB error occurs
728          */
729         private void doLock() throws SQLException {
730             if (!isWaiting()) {
731                 logger.debug("discard doLock {}", this);
732                 return;
733             }
734
735             /*
736              * There is a small window in which a client could invoke free() before the DB
737              * is updated. In that case, doUnlock will be added to the queue to run after
738              * this, which will delete the record, as desired. In addition, grant() will
739              * not do anything, because the lock state will have been set to UNAVAILABLE
740              * by free().
741              */
742
743             logger.debug("doLock {}", this);
744             try (Connection conn = feature.dataSource.getConnection()) {
745                 boolean success = false;
746                 try {
747                     success = doDbInsert(conn);
748
749                 } catch (SQLException e) {
750                     logger.info("failed to insert lock record - attempting update: {}", this, e);
751                     success = doDbUpdate(conn);
752                 }
753
754                 if (success) {
755                     grant();
756                     return;
757                 }
758             }
759
760             removeFromMap();
761         }
762
763         /**
764          * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
765          * it fails, as this should only be executed in response to a call to
766          * {@link #free()}.
767          *
768          * @throws SQLException if a DB error occurs
769          */
770         private void doUnlock() throws SQLException {
771             logger.debug("unlock {}", this);
772             try (Connection conn = feature.dataSource.getConnection()) {
773                 doDbDelete(conn);
774             }
775
776             removeFromMap();
777         }
778
779         /**
780          * Attempts to extend a lock in the DB. Generates a callback, indicating success
781          * or failure.
782          *
783          * @throws SQLException if a DB error occurs
784          */
785         private void doExtend() throws SQLException {
786             if (!isWaiting()) {
787                 logger.debug("discard doExtend {}", this);
788                 return;
789             }
790
791             /*
792              * There is a small window in which a client could invoke free() before the DB
793              * is updated. In that case, doUnlock will be added to the queue to run after
794              * this, which will delete the record, as desired. In addition, grant() will
795              * not do anything, because the lock state will have been set to UNAVAILABLE
796              * by free().
797              */
798
799             logger.debug("doExtend {}", this);
800             try (Connection conn = feature.dataSource.getConnection()) {
801                 /*
802                  * invoker may have called extend() before free() had a chance to insert
803                  * the record, thus we have to try to insert, if the update fails
804                  */
805                 if (doDbUpdate(conn) || doDbInsert(conn)) {
806                     grant();
807                     return;
808                 }
809             }
810
811             removeFromMap();
812         }
813
814         /**
815          * Inserts the lock into the DB.
816          *
817          * @param conn DB connection
818          * @return {@code true} if a record was successfully inserted, {@code false}
819          *         otherwise
820          * @throws SQLException if a DB error occurs
821          */
822         protected boolean doDbInsert(Connection conn) throws SQLException {
823             logger.debug("insert lock record {}", this);
824             try (PreparedStatement stmt =
825                             conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
826                                             + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
827
828                 stmt.setString(1, getResourceId());
829                 stmt.setString(2, feature.hostName);
830                 stmt.setString(3, feature.uuidString);
831                 stmt.setInt(4, getHoldSec());
832
833                 stmt.executeUpdate();
834
835                 this.hostName = feature.hostName;
836                 this.uuidString = feature.uuidString;
837
838                 return true;
839             }
840         }
841
842         /**
843          * Updates the lock in the DB.
844          *
845          * @param conn DB connection
846          * @return {@code true} if a record was successfully updated, {@code false}
847          *         otherwise
848          * @throws SQLException if a DB error occurs
849          */
850         protected boolean doDbUpdate(Connection conn) throws SQLException {
851             logger.debug("update lock record {}", this);
852             try (PreparedStatement stmt =
853                             conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
854                                             + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
855                                             + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
856
857                 stmt.setString(1, getResourceId());
858                 stmt.setString(2, feature.hostName);
859                 stmt.setString(3, feature.uuidString);
860                 stmt.setInt(4, getHoldSec());
861
862                 stmt.setString(5, getResourceId());
863                 stmt.setString(6, this.hostName);
864                 stmt.setString(7, this.uuidString);
865
866                 if (stmt.executeUpdate() != 1) {
867                     return false;
868                 }
869
870                 this.hostName = feature.hostName;
871                 this.uuidString = feature.uuidString;
872
873                 return true;
874             }
875         }
876
877         /**
878          * Deletes the lock from the DB.
879          *
880          * @param conn DB connection
881          * @throws SQLException if a DB error occurs
882          */
883         protected void doDbDelete(Connection conn) throws SQLException {
884             logger.debug("delete lock record {}", this);
885             try (PreparedStatement stmt =
886                             conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
887
888                 stmt.setString(1, getResourceId());
889                 stmt.setString(2, this.hostName);
890                 stmt.setString(3, this.uuidString);
891
892                 stmt.executeUpdate();
893             }
894         }
895
896         /**
897          * Removes the lock from the map, and sends a notification using the current
898          * thread.
899          */
900         private void removeFromMap() {
901             logger.debug("remove lock from map {}", this);
902             feature.resource2lock.remove(getResourceId(), this);
903
904             synchronized (this) {
905                 if (!isUnavailable()) {
906                     deny(LOCK_LOST_MSG, true);
907                 }
908             }
909         }
910
911         @Override
912         public String toString() {
913             return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
914                             + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
915                             + uuidString + "]";
916         }
917     }
918
919     @FunctionalInterface
920     private static interface RunnableWithEx {
921         void run() throws SQLException;
922     }
923
924     // these may be overridden by junit tests
925
926     protected Properties getProperties(String fileName) {
927         return SystemPersistenceConstants.getManager().getProperties(fileName);
928     }
929
930     protected ScheduledExecutorService getThreadPool() {
931         return engine.getExecutorService();
932     }
933
934     protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
935                     LockCallback callback) {
936         return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
937     }
938 }