523c0d933fc8b11e5b6bb9344b53d0b80e62a964
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import java.sql.Connection;
24 import java.sql.PreparedStatement;
25 import java.sql.ResultSet;
26 import java.sql.SQLException;
27 import java.util.HashSet;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.Set;
31 import java.util.UUID;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ScheduledExecutorService;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import lombok.AccessLevel;
39 import lombok.Getter;
40 import lombok.Setter;
41 import org.apache.commons.dbcp2.BasicDataSource;
42 import org.apache.commons.dbcp2.BasicDataSourceFactory;
43 import org.onap.policy.common.utils.network.NetworkUtil;
44 import org.onap.policy.drools.core.lock.AlwaysFailLock;
45 import org.onap.policy.drools.core.lock.Lock;
46 import org.onap.policy.drools.core.lock.LockCallback;
47 import org.onap.policy.drools.core.lock.LockImpl;
48 import org.onap.policy.drools.core.lock.LockState;
49 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
50 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
51 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
52 import org.onap.policy.drools.system.PolicyEngine;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56
57 /**
58  * Distributed implementation of the Lock Feature. Maintains locks across servers using a
59  * shared DB.
60  *
61  * <p/>
62  * Note: this implementation does <i>not</i> honor the waitForLocks={@code true}
63  * parameter.
64  *
65  * <p/>
66  * Additional Notes:
67  * <dl>
68  * <li>The <i>owner</i> field in the DB is not derived from the lock's owner info, but is
69  * instead populated with the {@link #uuidString}.</li>
70  * <li>A periodic check of the DB is made to determine if any of the locks have
71  * expired.</li>
72  * <li>When a lock is deserialized, it will not initially appear in this feature's map; it
73  * will be added to the map once free() or extend() is invoked, provided there isn't
74  * already an entry. In addition, it initially has the host and UUID of the feature
75  * instance that created it. However, as soon as doExtend() completes successfully, the
76  * host and UUID of the lock will be updated to reflect the values within this feature
77  * instance.</li>
78  * </dl>
79  */
80 public class DistributedLockManager
81                 implements PolicyResourceLockManager, PolicyEngineFeatureApi {
82
83     private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
84
85     private static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
86     private static final String LOCK_LOST_MSG = "lock lost";
87     private static final String NOT_LOCKED_MSG = "not locked";
88
89     @Getter(AccessLevel.PROTECTED)
90     @Setter(AccessLevel.PROTECTED)
91     private static DistributedLockManager latestInstance = null;
92
93
94     /**
95      * Name of the host on which this JVM is running.
96      */
97     @Getter
98     private final String hostName;
99
100     /**
101      * UUID of this object.
102      */
103     @Getter
104     private final String uuidString = UUID.randomUUID().toString();
105
106     /**
107      * Maps a resource to the lock that owns it, or is awaiting a request for it. Once a
108      * lock is added to the map, it remains in the map until the lock is lost or until the
109      * unlock request completes.
110      */
111     private final Map<String, DistributedLock> resource2lock = new ConcurrentHashMap<>();
112
113     /**
114      * Engine with which this manager is associated.
115      */
116     private PolicyEngine engine;
117
118     /**
119      * Feature properties.
120      */
121     private DistributedLockProperties featProps;
122
123     /**
124      * Thread pool used to check for lock expiration and to notify owners when locks are
125      * granted or lost.
126      */
127     private ScheduledExecutorService exsvc = null;
128
129     /**
130      * Data source used to connect to the DB.
131      */
132     private BasicDataSource dataSource = null;
133
134
135     /**
136      * Constructs the object.
137      */
138     public DistributedLockManager() {
139         this.hostName = NetworkUtil.getHostname();
140     }
141
142     @Override
143     public int getSequenceNumber() {
144         return 1000;
145     }
146
147     @Override
148     public boolean isAlive() {
149         return (exsvc != null);
150     }
151
152     @Override
153     public boolean start() {
154         // handled via engine API
155         return true;
156     }
157
158     @Override
159     public boolean stop() {
160         // handled via engine API
161         return true;
162     }
163
164     @Override
165     public void shutdown() {
166         // handled via engine API
167     }
168
169     @Override
170     public boolean isLocked() {
171         return false;
172     }
173
174     @Override
175     public boolean lock() {
176         return true;
177     }
178
179     @Override
180     public boolean unlock() {
181         return true;
182     }
183
184     @Override
185     public PolicyResourceLockManager beforeCreateLockManager(PolicyEngine engine, Properties properties) {
186
187         try {
188             this.engine = engine;
189             this.featProps = new DistributedLockProperties(getProperties(CONFIGURATION_PROPERTIES_NAME));
190             this.exsvc = getThreadPool();
191             this.dataSource = makeDataSource();
192
193             return this;
194
195         } catch (Exception e) {
196             throw new DistributedLockManagerException(e);
197         }
198     }
199
200     @Override
201     public boolean afterStart(PolicyEngine engine) {
202
203         try {
204             exsvc.execute(this::deleteExpiredDbLocks);
205             exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
206
207             setLatestInstance(this);
208
209         } catch (Exception e) {
210             throw new DistributedLockManagerException(e);
211         }
212
213         return false;
214     }
215
216     /**
217      * Make data source.
218      *
219      * @return a new, pooled data source
220      * @throws Exception exception
221      */
222     protected BasicDataSource makeDataSource() throws Exception {
223         Properties props = new Properties();
224         props.put("driverClassName", featProps.getDbDriver());
225         props.put("url", featProps.getDbUrl());
226         props.put("username", featProps.getDbUser());
227         props.put("password", featProps.getDbPwd());
228         props.put("testOnBorrow", "true");
229         props.put("poolPreparedStatements", "true");
230
231         // additional properties are listed in the GenericObjectPool API
232
233         return BasicDataSourceFactory.createDataSource(props);
234     }
235
236     /**
237      * Deletes expired locks from the DB.
238      */
239     private void deleteExpiredDbLocks() {
240         logger.info("deleting all expired locks from the DB");
241
242         try (Connection conn = dataSource.getConnection();
243                         PreparedStatement stmt = conn
244                                         .prepareStatement("DELETE FROM pooling.locks WHERE expirationTime <= now()")) {
245
246             int ndel = stmt.executeUpdate();
247             logger.info("deleted {} expired locks from the DB", ndel);
248
249         } catch (SQLException e) {
250             logger.warn("failed to delete expired locks from the DB", e);
251         }
252     }
253
254     /**
255      * Closes the data source. Does <i>not</i> invoke any lock call-backs.
256      */
257     @Override
258     public boolean afterStop(PolicyEngine engine) {
259         exsvc = null;
260         closeDataSource();
261         return false;
262     }
263
264     /**
265      * Closes {@link #dataSource} and sets it to {@code null}.
266      */
267     private void closeDataSource() {
268         try {
269             if (dataSource != null) {
270                 dataSource.close();
271             }
272
273         } catch (SQLException e) {
274             logger.error("cannot close the distributed locking DB", e);
275         }
276
277         dataSource = null;
278     }
279
280     @Override
281     public Lock createLock(String resourceId, String ownerKey, int holdSec, LockCallback callback,
282                     boolean waitForLock) {
283
284         if (latestInstance != this) {
285             AlwaysFailLock lock = new AlwaysFailLock(resourceId, ownerKey, holdSec, callback);
286             lock.notifyUnavailable();
287             return lock;
288         }
289
290         DistributedLock lock = makeLock(LockState.WAITING, resourceId, ownerKey, holdSec, callback);
291
292         DistributedLock existingLock = resource2lock.putIfAbsent(resourceId, lock);
293
294         // do these outside of compute() to avoid blocking other map operations
295         if (existingLock == null) {
296             logger.debug("added lock to map {}", lock);
297             lock.scheduleRequest(lock::doLock);
298         } else {
299             lock.deny("resource is busy", true);
300         }
301
302         return lock;
303     }
304
305     /**
306      * Checks for expired locks.
307      */
308     private void checkExpired() {
309
310         try {
311             logger.info("checking for expired locks");
312             Set<String> expiredIds = new HashSet<>(resource2lock.keySet());
313             identifyDbLocks(expiredIds);
314             expireLocks(expiredIds);
315
316             exsvc.schedule(this::checkExpired, featProps.getExpireCheckSec(), TimeUnit.SECONDS);
317
318         } catch (RejectedExecutionException e) {
319             logger.warn("thread pool is no longer accepting requests", e);
320
321         } catch (SQLException | RuntimeException e) {
322             logger.error("error checking expired locks", e);
323             exsvc.schedule(this::checkExpired, featProps.getRetrySec(), TimeUnit.SECONDS);
324         }
325
326         logger.info("done checking for expired locks");
327     }
328
329     /**
330      * Identifies this feature instance's locks that the DB indicates are still active.
331      *
332      * @param expiredIds IDs of resources that have expired locks. If a resource is still
333      *        locked, it's ID is removed from this set
334      * @throws SQLException if a DB error occurs
335      */
336     private void identifyDbLocks(Set<String> expiredIds) throws SQLException {
337         /*
338          * We could query for host and UUIDs that actually appear within the locks, but
339          * those might change while the query is running so no real value in doing that.
340          * On the other hand, there's only a brief instance between the time a
341          * deserialized lock is added to this feature instance and its doExtend() method
342          * updates its host and UUID to match this feature instance. If this happens to
343          * run during that brief instance, then the lock will be lost and the callback
344          * invoked. It isn't worth complicating this code further to handle those highly
345          * unlikely cases.
346          */
347
348         // @formatter:off
349         try (Connection conn = dataSource.getConnection();
350                     PreparedStatement stmt = conn.prepareStatement(
351                         "SELECT resourceId FROM pooling.locks WHERE host=? AND owner=? AND expirationTime > now()")) {
352             // @formatter:on
353
354             stmt.setString(1, hostName);
355             stmt.setString(2, uuidString);
356
357             try (ResultSet resultSet = stmt.executeQuery()) {
358                 while (resultSet.next()) {
359                     String resourceId = resultSet.getString(1);
360
361                     // we have now seen this resource id
362                     expiredIds.remove(resourceId);
363                 }
364             }
365         }
366     }
367
368     /**
369      * Expires locks for the resources that no longer appear within the DB.
370      *
371      * @param expiredIds IDs of resources that have expired locks
372      */
373     private void expireLocks(Set<String> expiredIds) {
374         for (String resourceId : expiredIds) {
375             AtomicReference<DistributedLock> lockref = new AtomicReference<>(null);
376
377             resource2lock.computeIfPresent(resourceId, (key, lock) -> {
378                 if (lock.isActive()) {
379                     // it thinks it's active, but it isn't - remove from the map
380                     lockref.set(lock);
381                     return null;
382                 }
383
384                 return lock;
385             });
386
387             DistributedLock lock = lockref.get();
388             if (lock != null) {
389                 logger.debug("removed lock from map {}", lock);
390                 lock.deny(LOCK_LOST_MSG, false);
391             }
392         }
393     }
394
395     /**
396      * Distributed Lock implementation.
397      */
398     public static class DistributedLock extends LockImpl {
399         private static final long serialVersionUID = 1L;
400
401         /**
402          * Feature containing this lock. May be {@code null} until the feature is
403          * identified. Note: this can only be null if the lock has been de-serialized.
404          */
405         private transient DistributedLockManager feature;
406
407         /**
408          * Host name from the feature instance that created this object. Replaced with the
409          * host name from the current feature instance whenever the lock is successfully
410          * extended.
411          */
412         private String hostName;
413
414         /**
415          * UUID string from the feature instance that created this object. Replaced with
416          * the UUID string from the current feature instance whenever the lock is
417          * successfully extended.
418          */
419         private String uuidString;
420
421         /**
422          * {@code True} if the lock is busy making a request, {@code false} otherwise.
423          */
424         private transient boolean busy = false;
425
426         /**
427          * Request to be performed.
428          */
429         private transient RunnableWithEx request = null;
430
431         /**
432          * Number of times we've retried a request.
433          */
434         private transient int nretries = 0;
435
436         /**
437          * Constructs the object.
438          */
439         public DistributedLock() {
440             this.hostName = "";
441             this.uuidString = "";
442         }
443
444         /**
445          * Constructs the object.
446          *
447          * @param state initial state of the lock
448          * @param resourceId identifier of the resource to be locked
449          * @param ownerKey information identifying the owner requesting the lock
450          * @param holdSec amount of time, in seconds, for which the lock should be held,
451          *        after which it will automatically be released
452          * @param callback callback to be invoked once the lock is granted, or
453          *        subsequently lost; must not be {@code null}
454          * @param feature feature containing this lock
455          */
456         public DistributedLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback,
457                         DistributedLockManager feature) {
458             super(state, resourceId, ownerKey, holdSec, callback);
459
460             this.feature = feature;
461             this.hostName = feature.hostName;
462             this.uuidString = feature.uuidString;
463         }
464
465         /**
466          * Grants this lock. The notification is <i>always</i> invoked via the
467          * <i>foreground</i> thread.
468          */
469         protected void grant() {
470             synchronized (this) {
471                 if (isUnavailable()) {
472                     return;
473                 }
474
475                 setState(LockState.ACTIVE);
476             }
477
478             logger.info("lock granted: {}", this);
479
480             notifyAvailable();
481         }
482
483         /**
484          * Permanently denies this lock.
485          *
486          * @param reason the reason the lock was denied
487          * @param foreground {@code true} if the callback can be invoked in the current
488          *        (i.e., foreground) thread, {@code false} if it should be invoked via the
489          *        executor
490          */
491         protected void deny(String reason, boolean foreground) {
492             synchronized (this) {
493                 setState(LockState.UNAVAILABLE);
494             }
495
496             logger.info("{}: {}", reason, this);
497
498             if (feature == null || foreground) {
499                 notifyUnavailable();
500
501             } else {
502                 feature.exsvc.execute(this::notifyUnavailable);
503             }
504         }
505
506         @Override
507         public boolean free() {
508             // do a quick check of the state
509             if (isUnavailable()) {
510                 return false;
511             }
512
513             logger.info("releasing lock: {}", this);
514
515             if (!attachFeature()) {
516                 setState(LockState.UNAVAILABLE);
517                 return false;
518             }
519
520             AtomicBoolean result = new AtomicBoolean(false);
521
522             feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
523                 if (curlock == this && !isUnavailable()) {
524                     // this lock was the owner
525                     result.set(true);
526                     setState(LockState.UNAVAILABLE);
527
528                     /*
529                      * NOTE: do NOT return null; curlock must remain until doUnlock
530                      * completes.
531                      */
532                 }
533
534                 return curlock;
535             });
536
537             if (result.get()) {
538                 scheduleRequest(this::doUnlock);
539                 return true;
540             }
541
542             return false;
543         }
544
545         @Override
546         public void extend(int holdSec, LockCallback callback) {
547             if (holdSec < 0) {
548                 throw new IllegalArgumentException("holdSec is negative");
549             }
550
551             setHoldSec(holdSec);
552             setCallback(callback);
553
554             // do a quick check of the state
555             if (isUnavailable() || !attachFeature()) {
556                 deny(LOCK_LOST_MSG, true);
557                 return;
558             }
559
560             AtomicBoolean success = new AtomicBoolean(false);
561
562             feature.resource2lock.computeIfPresent(getResourceId(), (resourceId, curlock) -> {
563                 if (curlock == this && !isUnavailable()) {
564                     success.set(true);
565                     setState(LockState.WAITING);
566                 }
567
568                 // note: leave it in the map until doUnlock() removes it
569
570                 return curlock;
571             });
572
573             if (success.get()) {
574                 scheduleRequest(this::doExtend);
575
576             } else {
577                 deny(NOT_LOCKED_MSG, true);
578             }
579         }
580
581         /**
582          * Attaches to the feature instance, if not already attached.
583          *
584          * @return {@code true} if the lock is now attached to a feature, {@code false}
585          *         otherwise
586          */
587         private synchronized boolean attachFeature() {
588             if (feature != null) {
589                 // already attached
590                 return true;
591             }
592
593             feature = latestInstance;
594             if (feature == null) {
595                 logger.warn("no feature yet for {}", this);
596                 return false;
597             }
598
599             // put this lock into the map
600             feature.resource2lock.putIfAbsent(getResourceId(), this);
601
602             return true;
603         }
604
605         /**
606          * Schedules a request for execution.
607          *
608          * @param schedreq the request that should be scheduled
609          */
610         private synchronized void scheduleRequest(RunnableWithEx schedreq) {
611             logger.debug("schedule lock action {}", this);
612             nretries = 0;
613             request = schedreq;
614             feature.exsvc.execute(this::doRequest);
615         }
616
617         /**
618          * Reschedules a request for execution, if there is not already a request in the
619          * queue, and if the retry count has not been exhausted.
620          *
621          * @param req request to be rescheduled
622          */
623         private void rescheduleRequest(RunnableWithEx req) {
624             synchronized (this) {
625                 if (request != null) {
626                     // a new request has already been scheduled - it supersedes "req"
627                     logger.debug("not rescheduling lock action {}", this);
628                     return;
629                 }
630
631                 if (nretries++ < feature.featProps.getMaxRetries()) {
632                     logger.debug("reschedule for {}s {}", feature.featProps.getRetrySec(), this);
633                     request = req;
634                     feature.exsvc.schedule(this::doRequest, feature.featProps.getRetrySec(), TimeUnit.SECONDS);
635                     return;
636                 }
637             }
638
639             logger.warn("retry count {} exhausted for lock: {}", feature.featProps.getMaxRetries(), this);
640             removeFromMap();
641         }
642
643         /**
644          * Gets, and removes, the next request from the queue. Clears {@link #busy} if
645          * there are no more requests in the queue.
646          *
647          * @param prevReq the previous request that was just run
648          *
649          * @return the next request, or {@code null} if the queue is empty
650          */
651         private synchronized RunnableWithEx getNextRequest(RunnableWithEx prevReq) {
652             if (request == null || request == prevReq) {
653                 logger.debug("no more requests for {}", this);
654                 busy = false;
655                 return null;
656             }
657
658             RunnableWithEx req = request;
659             request = null;
660
661             return req;
662         }
663
664         /**
665          * Executes the current request, if none are currently executing.
666          */
667         private void doRequest() {
668             synchronized (this) {
669                 if (busy) {
670                     // another thread is already processing the request(s)
671                     return;
672                 }
673                 busy = true;
674             }
675
676             /*
677              * There is a race condition wherein this thread could invoke run() while the
678              * next scheduled thread checks the busy flag and finds that work is being
679              * done and returns, leaving the next work item in "request". In that case,
680              * the next work item may never be executed, thus we use a loop here, instead
681              * of just executing a single request.
682              */
683             RunnableWithEx req = null;
684             while ((req = getNextRequest(req)) != null) {
685                 if (feature.resource2lock.get(getResourceId()) != this) {
686                     /*
687                      * no longer in the map - don't apply the action, as it may interfere
688                      * with any newly added Lock object
689                      */
690                     logger.debug("discard lock action {}", this);
691                     synchronized (this) {
692                         busy = false;
693                     }
694                     return;
695                 }
696
697                 try {
698                     /*
699                      * Run the request. If it throws an exception, then it will be
700                      * rescheduled for execution a little later.
701                      */
702                     req.run();
703
704                 } catch (SQLException e) {
705                     logger.warn("request failed for lock: {}", this, e);
706
707                     if (feature.featProps.isTransient(e.getErrorCode())) {
708                         // retry the request a little later
709                         rescheduleRequest(req);
710                     } else {
711                         removeFromMap();
712                     }
713
714                 } catch (RuntimeException e) {
715                     logger.warn("request failed for lock: {}", this, e);
716                     removeFromMap();
717                 }
718             }
719         }
720
721         /**
722          * Attempts to add a lock to the DB. Generates a callback, indicating success or
723          * failure.
724          *
725          * @throws SQLException if a DB error occurs
726          */
727         private void doLock() throws SQLException {
728             if (!isWaiting()) {
729                 logger.debug("discard doLock {}", this);
730                 return;
731             }
732
733             /*
734              * There is a small window in which a client could invoke free() before the DB
735              * is updated. In that case, doUnlock will be added to the queue to run after
736              * this, which will delete the record, as desired. In addition, grant() will
737              * not do anything, because the lock state will have been set to UNAVAILABLE
738              * by free().
739              */
740
741             logger.debug("doLock {}", this);
742             try (Connection conn = feature.dataSource.getConnection()) {
743                 boolean success = false;
744                 try {
745                     success = doDbInsert(conn);
746
747                 } catch (SQLException e) {
748                     logger.info("failed to insert lock record - attempting update: {}", this, e);
749                     success = doDbUpdate(conn);
750                 }
751
752                 if (success) {
753                     grant();
754                     return;
755                 }
756             }
757
758             removeFromMap();
759         }
760
761         /**
762          * Attempts to remove a lock from the DB. Does <i>not</i> generate a callback if
763          * it fails, as this should only be executed in response to a call to
764          * {@link #free()}.
765          *
766          * @throws SQLException if a DB error occurs
767          */
768         private void doUnlock() throws SQLException {
769             logger.debug("unlock {}", this);
770             try (Connection conn = feature.dataSource.getConnection()) {
771                 doDbDelete(conn);
772             }
773
774             removeFromMap();
775         }
776
777         /**
778          * Attempts to extend a lock in the DB. Generates a callback, indicating success
779          * or failure.
780          *
781          * @throws SQLException if a DB error occurs
782          */
783         private void doExtend() throws SQLException {
784             if (!isWaiting()) {
785                 logger.debug("discard doExtend {}", this);
786                 return;
787             }
788
789             /*
790              * There is a small window in which a client could invoke free() before the DB
791              * is updated. In that case, doUnlock will be added to the queue to run after
792              * this, which will delete the record, as desired. In addition, grant() will
793              * not do anything, because the lock state will have been set to UNAVAILABLE
794              * by free().
795              */
796
797             logger.debug("doExtend {}", this);
798             try (Connection conn = feature.dataSource.getConnection()) {
799                 /*
800                  * invoker may have called extend() before free() had a chance to insert
801                  * the record, thus we have to try to insert, if the update fails
802                  */
803                 if (doDbUpdate(conn) || doDbInsert(conn)) {
804                     grant();
805                     return;
806                 }
807             }
808
809             removeFromMap();
810         }
811
812         /**
813          * Inserts the lock into the DB.
814          *
815          * @param conn DB connection
816          * @return {@code true} if a record was successfully inserted, {@code false}
817          *         otherwise
818          * @throws SQLException if a DB error occurs
819          */
820         protected boolean doDbInsert(Connection conn) throws SQLException {
821             logger.debug("insert lock record {}", this);
822             try (PreparedStatement stmt =
823                             conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
824                                             + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
825
826                 stmt.setString(1, getResourceId());
827                 stmt.setString(2, feature.hostName);
828                 stmt.setString(3, feature.uuidString);
829                 stmt.setInt(4, getHoldSec());
830
831                 stmt.executeUpdate();
832
833                 this.hostName = feature.hostName;
834                 this.uuidString = feature.uuidString;
835
836                 return true;
837             }
838         }
839
840         /**
841          * Updates the lock in the DB.
842          *
843          * @param conn DB connection
844          * @return {@code true} if a record was successfully updated, {@code false}
845          *         otherwise
846          * @throws SQLException if a DB error occurs
847          */
848         protected boolean doDbUpdate(Connection conn) throws SQLException {
849             logger.debug("update lock record {}", this);
850             try (PreparedStatement stmt =
851                             conn.prepareStatement("UPDATE pooling.locks SET resourceId=?, host=?, owner=?,"
852                                             + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?"
853                                             + " AND ((host=? AND owner=?) OR expirationTime < now())")) {
854
855                 stmt.setString(1, getResourceId());
856                 stmt.setString(2, feature.hostName);
857                 stmt.setString(3, feature.uuidString);
858                 stmt.setInt(4, getHoldSec());
859
860                 stmt.setString(5, getResourceId());
861                 stmt.setString(6, this.hostName);
862                 stmt.setString(7, this.uuidString);
863
864                 if (stmt.executeUpdate() != 1) {
865                     return false;
866                 }
867
868                 this.hostName = feature.hostName;
869                 this.uuidString = feature.uuidString;
870
871                 return true;
872             }
873         }
874
875         /**
876          * Deletes the lock from the DB.
877          *
878          * @param conn DB connection
879          * @throws SQLException if a DB error occurs
880          */
881         protected void doDbDelete(Connection conn) throws SQLException {
882             logger.debug("delete lock record {}", this);
883             try (PreparedStatement stmt =
884                             conn.prepareStatement("DELETE pooling.locks WHERE resourceId=? AND host=? AND owner=?")) {
885
886                 stmt.setString(1, getResourceId());
887                 stmt.setString(2, this.hostName);
888                 stmt.setString(3, this.uuidString);
889
890                 stmt.executeUpdate();
891             }
892         }
893
894         /**
895          * Removes the lock from the map, and sends a notification using the current
896          * thread.
897          */
898         private void removeFromMap() {
899             logger.debug("remove lock from map {}", this);
900             feature.resource2lock.remove(getResourceId(), this);
901
902             synchronized (this) {
903                 if (!isUnavailable()) {
904                     deny(LOCK_LOST_MSG, true);
905                 }
906             }
907         }
908
909         @Override
910         public String toString() {
911             return "DistributedLock [state=" + getState() + ", resourceId=" + getResourceId() + ", ownerKey="
912                             + getOwnerKey() + ", holdSec=" + getHoldSec() + ", hostName=" + hostName + ", uuidString="
913                             + uuidString + "]";
914         }
915     }
916
917     @FunctionalInterface
918     private static interface RunnableWithEx {
919         void run() throws SQLException;
920     }
921
922     // these may be overridden by junit tests
923
924     protected Properties getProperties(String fileName) {
925         return SystemPersistenceConstants.getManager().getProperties(fileName);
926     }
927
928     protected ScheduledExecutorService getThreadPool() {
929         return engine.getExecutorService();
930     }
931
932     protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
933                     LockCallback callback) {
934         return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, this);
935     }
936 }