2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.serverpool;
23 import static org.junit.Assert.assertTrue;
24 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_LOCK_AUDIT_GRACE_PERIOD;
25 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_LOCK_AUDIT_PERIOD;
26 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_LOCK_AUDIT_RETRY_DELAY;
27 import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_LOCK_TIME_TO_LIVE;
28 import static org.onap.policy.drools.serverpool.ServerPoolProperties.LOCK_AUDIT_GRACE_PERIOD;
29 import static org.onap.policy.drools.serverpool.ServerPoolProperties.LOCK_AUDIT_PERIOD;
30 import static org.onap.policy.drools.serverpool.ServerPoolProperties.LOCK_AUDIT_RETRY_DELAY;
31 import static org.onap.policy.drools.serverpool.ServerPoolProperties.LOCK_TIME_TO_LIVE;
32 import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
34 import java.io.IOException;
35 import java.io.ObjectInputStream;
36 import java.io.ObjectOutputStream;
37 import java.io.PrintStream;
38 import java.io.Serializable;
39 import java.lang.ref.Reference;
40 import java.lang.ref.ReferenceQueue;
41 import java.lang.ref.WeakReference;
42 import java.util.ArrayList;
43 import java.util.Base64;
44 import java.util.Collection;
45 import java.util.Date;
46 import java.util.HashMap;
47 import java.util.IdentityHashMap;
48 import java.util.LinkedList;
49 import java.util.List;
51 import java.util.Queue;
52 import java.util.TimerTask;
53 import java.util.TreeMap;
54 import java.util.UUID;
55 import java.util.concurrent.LinkedTransferQueue;
56 import java.util.concurrent.TimeUnit;
57 import javax.ws.rs.client.Entity;
58 import javax.ws.rs.client.WebTarget;
59 import javax.ws.rs.core.MediaType;
60 import javax.ws.rs.core.Response;
62 import lombok.NonNull;
63 import org.onap.policy.drools.core.DroolsRunnable;
64 import org.onap.policy.drools.core.PolicyContainer;
65 import org.onap.policy.drools.core.PolicySession;
66 import org.onap.policy.drools.core.lock.Lock;
67 import org.onap.policy.drools.core.lock.LockCallback;
68 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
73 * This class provides a locking mechanism based upon a string key that
74 * identifies the lock, and another string key that identifies the owner.
75 * The existence of the 'TargetLock' instance doesn't mean that the
76 * corresponding lock has been acquired -- this is only the case if the
77 * instance is in the 'ACTIVE' state.
78 * A lock in the ACTIVE or WAITING state exists in two sets of tables,
79 * which may be on different hosts:
80 * LocalLocks - these two tables are associated with the owner key of the
81 * lock. They are in an adjunct to the bucket associated with this key,
82 * and the bucket is owned by the host containing the entry.
83 * GlobalLocks - this table is associated with the lock key. It is an
84 * adjunct to the bucket associated with this key, and the bucket is
85 * owned by the host containing the entry.
87 public class TargetLock implements Lock, Serializable {
88 private static Logger logger = LoggerFactory.getLogger(TargetLock.class);
90 // Listener class to handle state changes that require restarting the audit
91 private static EventHandler eventHandler = new EventHandler();
94 // register Listener class
95 Events.register(eventHandler);
98 // this is used to locate ACTIVE 'TargetLock' instances that have been
99 // abandoned -- as the GC cleans up the 'WeakReference' instances referring
100 // to these locks, we use that information to clean them up
101 private static ReferenceQueue<TargetLock> abandoned = new ReferenceQueue<>();
104 static final int ACCEPTED = 202; //Response.Status.ACCEPTED.getStatusCode()
105 static final int NO_CONTENT = 204; //Response.Status.NO_CONTENT.getStatusCode()
106 static final int LOCKED = 423;
108 // Values extracted from properties
110 private static String timeToLive;
111 private static long auditPeriod;
112 private static long auditGracePeriod;
113 private static long auditRetryDelay;
116 // WAITING - in line to acquire the lock
117 // ACTIVE - currently holding the lock
118 // FREE - WAITING/ACTIVE locks that were explicitly freed
119 // LOST - could occur when a de-serialized ACTIVE lock can't be made
120 // ACTIVE because there is already an ACTIVE holder of the lock
122 WAITING, ACTIVE, FREE, LOST
125 // this contains information that is placed in the 'LocalLocks' tables,
126 // and has a one-to-one correspondence with the 'TargetLock' instance
127 private Identity identity;
129 // this is the only field that can change after initialization
132 // this is used to notify the application when a lock is available,
133 // or if it is not available
134 private volatile LockCallback owner;
136 // This is what is actually called by the infrastructure to do the owner
137 // notification. The owner may be running in a Drools session, in which case
138 // the actual notification should be done within that thread -- the 'context'
139 // object ensures that it happens this way.
140 private volatile LockCallback context;
142 // HTTP query parameters
143 private static final String QP_KEY = "key";
144 private static final String QP_OWNER = "owner";
145 private static final String QP_UUID = "uuid";
146 private static final String QP_WAIT = "wait";
147 private static final String QP_SERVER = "server";
148 private static final String QP_TTL = "ttl";
151 * This method triggers registration of 'eventHandler', and also extracts
154 static void startup() {
156 getProperty(LOCK_TIME_TO_LIVE, DEFAULT_LOCK_TIME_TO_LIVE);
157 timeToLive = String.valueOf(intTimeToLive);
158 auditPeriod = getProperty(LOCK_AUDIT_PERIOD, DEFAULT_LOCK_AUDIT_PERIOD);
160 getProperty(LOCK_AUDIT_GRACE_PERIOD, DEFAULT_LOCK_AUDIT_GRACE_PERIOD);
162 getProperty(LOCK_AUDIT_RETRY_DELAY, DEFAULT_LOCK_AUDIT_RETRY_DELAY);
168 static void shutdown() {
169 AbandonedHandler ah = abandonedHandler;
172 abandonedHandler = null;
178 * Constructor - initializes the 'TargetLock' instance, and tries to go
179 * ACTIVE. The lock is initially placed in the WAITING state, and the owner
180 * and the owner will be notified when the success or failure of the lock
181 * attempt is determined.
183 * @param key string key identifying the lock
184 * @param ownerKey string key identifying the owner, which must hash to
185 * a bucket owned by the current host (it is typically a 'RequestID')
186 * @param owner owner of the lock (will be notified when going from
189 public TargetLock(String key, String ownerKey, LockCallback owner) {
190 this(key, ownerKey, owner, true);
194 * Constructor - initializes the 'TargetLock' instance, and tries to go
195 * ACTIVE. The lock is initially placed in the WAITING state, and the owner
196 * and the owner will be notified when the success or failure of the lock
197 * attempt is determined.
199 * @param key string key identifying the lock
200 * @param ownerKey string key identifying the owner, which must hash to
201 * a bucket owned by the current host (it is typically a 'RequestID')
202 * @param owner owner of the lock (will be notified when going from
204 * @param waitForLock this controls the behavior when 'key' is already
205 * locked - 'true' means wait for it to be freed, 'false' means fail
207 public TargetLock(final String key, final String ownerKey,
208 final LockCallback owner, final boolean waitForLock) {
210 throw(new IllegalArgumentException("TargetLock: 'key' can't be null"));
212 if (ownerKey == null) {
213 throw(new IllegalArgumentException("TargetLock: 'ownerKey' can't be null"));
215 if (!Bucket.isKeyOnThisServer(ownerKey)) {
216 // associated bucket is assigned to a different server
217 throw(new IllegalArgumentException("TargetLock: 'ownerKey=" + ownerKey
218 + "' not currently assigned to this server"));
221 throw(new IllegalArgumentException("TargetLock: 'owner' can't be null"));
223 identity = new Identity(key, ownerKey);
224 state = State.WAITING;
227 // determine the context
228 PolicySession session = PolicySession.getCurrentSession();
229 if (session != null) {
230 // deliver through a 'PolicySessionContext' class
231 Object lcontext = session.getAdjunct(PolicySessionContext.class);
232 if (!(lcontext instanceof LockCallback)) {
233 context = new PolicySessionContext(session);
234 session.setAdjunct(PolicySessionContext.class, context);
236 context = (LockCallback)lcontext;
239 // no context to deliver through -- call back directly to owner
243 // update 'LocalLocks' tables
244 final WeakReference<TargetLock> wr = new WeakReference<>(this, abandoned);
245 final LocalLocks localLocks = LocalLocks.get(ownerKey);
247 synchronized (localLocks) {
248 localLocks.weakReferenceToIdentity.put(wr, identity);
249 localLocks.uuidToWeakReference.put(identity.uuid, wr);
252 // The associated 'GlobalLocks' table may or may not be on a different
253 // host. Also, the following call may queue the message for later
254 // processing if the bucket is in a transient state.
255 Bucket.forwardAndProcess(key, new Bucket.Message() {
260 public void process() {
261 // 'GlobalLocks' is on the same host
262 State newState = GlobalLocks.get(key).lock(key, ownerKey, identity.uuid, waitForLock);
263 logger.info("Lock lock request: key={}, owner={}, uuid={}, wait={} (resp={})",
264 key, ownerKey, identity.uuid, waitForLock, state);
266 // The lock may now be ACTIVE, FREE, or WAITING -- we can notify
267 // the owner of the result now for ACTIVE or FREE. Also, the callback
268 // may occur while the constructor is still on the stack, although
269 // this won't happen in a Drools session.
273 // lock was successful - send notification
274 context.lockAvailable(TargetLock.this);
277 // lock attempt failed -
278 // clean up local tables, and send notification
279 synchronized (localLocks) {
280 localLocks.weakReferenceToIdentity.remove(wr);
281 localLocks.uuidToWeakReference.remove(identity.uuid);
284 context.lockUnavailable(TargetLock.this);
291 logger.error("Unknown state: {}", newState);
300 public void sendToServer(Server server, int bucketNumber) {
301 // actual lock is on a remote host -- send the request as
303 logger.info("Sending lock request to {}: key={}, owner={}, uuid={}, wait={}",
304 server, key, ownerKey, identity.uuid, waitForLock);
305 server.post("lock/lock", null, new Server.PostResponse() {
310 public WebTarget webTarget(WebTarget webTarget) {
312 .queryParam(QP_KEY, key)
313 .queryParam(QP_OWNER, ownerKey)
314 .queryParam(QP_UUID, identity.uuid.toString())
315 .queryParam(QP_WAIT, waitForLock)
316 .queryParam(QP_TTL, timeToLive);
323 public void response(Response response) {
324 logger.info("Lock response={} (code={})",
325 response, response.getStatus());
328 * there are three possible responses:
329 * 204 No Content - operation was successful
330 * 202 Accepted - operation is still in progress
331 * 423 (Locked) - lock in use, and 'waitForLock' is 'false'
333 switch (response.getStatus()) {
336 setState(State.ACTIVE);
337 context.lockAvailable(TargetLock.this);
341 // failed -- lock in use, and 'waitForLock == false'
342 setState(State.FREE);
343 synchronized (localLocks) {
344 localLocks.weakReferenceToIdentity.remove(wr);
345 localLocks.uuidToWeakReference.remove(identity.uuid);
348 context.lockUnavailable(TargetLock.this);
355 logger.error("Unknown status: {}", response.getStatus());
364 /********************/
365 /* 'Lock' Interface */
366 /********************/
369 * This method will free the current lock, or remove it from the waiting
370 * list if a response is pending.
372 * @return 'true' if successful, 'false' if it was not locked, or there
373 * appears to be corruption in 'LocalLocks' tables
376 public boolean free() {
377 synchronized (this) {
378 if (state != State.ACTIVE && state != State.WAITING) {
385 return identity.free();
389 * Return 'true' if the lock is in the ACTIVE state.
391 * @return 'true' if the lock is in the ACTIVE state, and 'false' if not
394 public synchronized boolean isActive() {
395 return state == State.ACTIVE;
399 * Return 'true' if the lock is not available.
401 * @return 'true' if the lock is in the FREE or LOST state,
405 public synchronized boolean isUnavailable() {
406 return state == State.FREE || state == State.LOST;
410 * Return 'true' if the lock is in the WAITING state.
412 * @return 'true' if the lock is in the WAITING state, and 'false' if not
414 public synchronized boolean isWaiting() {
415 return state == State.WAITING;
419 * Return the lock's key.
421 * @return the lock's key
424 public String getResourceId() {
429 * Return the owner key field.
431 * @return the owner key field
434 public String getOwnerKey() {
435 return identity.ownerKey;
439 * Extends the lock's hold time (not implemented yet).
442 public void extend(int holdSec, LockCallback callback) {
443 // not implemented yet
446 /********************/
451 * @param newState the new state value
453 private synchronized void setState(State newState) {
458 * Return the currentstate of the lock.
460 * @return the current state of the lock
462 public synchronized State getState() {
467 * This method is called when an incoming /lock/lock REST message is received.
469 * @param key string key identifying the lock, which must hash to a bucket
470 * owned by the current host
471 * @param ownerKey string key identifying the owner
472 * @param uuid the UUID that uniquely identifies the original 'TargetLock'
473 * @param waitForLock this controls the behavior when 'key' is already
474 * locked - 'true' means wait for it to be freed, 'false' means fail
475 * @param ttl similar to IP time-to-live -- it controls the number of hops
476 * the message may take
477 * @return the Response that should be passed back to the HTTP request
479 static Response incomingLock(String key, String ownerKey, UUID uuid, boolean waitForLock, int ttl) {
480 if (!Bucket.isKeyOnThisServer(key)) {
481 // this is the wrong server -- forward to the correct one
482 // (we can use this thread)
485 Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
486 if (server != null) {
487 WebTarget webTarget = server.getWebTarget("lock/lock");
488 if (webTarget != null) {
489 logger.warn("Forwarding 'lock/lock' to uuid {} "
490 + "(key={},owner={},uuid={},wait={},ttl={})",
491 server.getUuid(), key, ownerKey, uuid,
494 .queryParam(QP_KEY, key)
495 .queryParam(QP_OWNER, ownerKey)
496 .queryParam(QP_UUID, uuid.toString())
497 .queryParam(QP_WAIT, waitForLock)
498 .queryParam(QP_TTL, String.valueOf(ttl))
504 // if we reach this point, we didn't forward for some reason --
505 // return failure by indicating it is locked and unavailable
506 logger.error("Couldn't forward 'lock/lock' "
507 + "(key={},owner={},uuid={},wait={},ttl={})",
508 key, ownerKey, uuid, waitForLock, ttl);
509 return Response.noContent().status(LOCKED).build();
512 State state = GlobalLocks.get(key).lock(key, ownerKey, uuid, waitForLock);
515 return Response.noContent().build();
517 return Response.noContent().status(Response.Status.ACCEPTED).build();
519 return Response.noContent().status(LOCKED).build();
524 * This method is called when an incoming /lock/free REST message is received.
526 * @param key string key identifying the lock, which must hash to a bucket
527 * owned by the current host
528 * @param ownerKey string key identifying the owner
529 * @param uuid the UUID that uniquely identifies the original 'TargetLock'
530 * @param ttl similar to IP time-to-live -- it controls the number of hops
531 * the message may take
532 * @return the Response that should be passed back to the HTTP request
534 static Response incomingFree(String key, String ownerKey, UUID uuid, int ttl) {
535 if (!Bucket.isKeyOnThisServer(key)) {
536 // this is the wrong server -- forward to the correct one
537 // (we can use this thread)
540 Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
541 if (server != null) {
542 WebTarget webTarget = server.getWebTarget("lock/free");
543 if (webTarget != null) {
544 logger.warn("Forwarding 'lock/free' to uuid {} "
545 + "(key={},owner={},uuid={},ttl={})",
546 server.getUuid(), key, ownerKey, uuid, ttl);
548 .queryParam(QP_KEY, key)
549 .queryParam(QP_OWNER, ownerKey)
550 .queryParam(QP_UUID, uuid.toString())
551 .queryParam(QP_TTL, String.valueOf(ttl))
557 // if we reach this point, we didn't forward for some reason --
558 // return failure by indicating it is locked and unavailable
559 logger.error("Couldn't forward 'lock/free' "
560 + "(key={},owner={},uuid={},ttl={})",
561 key, ownerKey, uuid, ttl);
565 // TBD: should this return a more meaningful response?
566 GlobalLocks.get(key).unlock(key, uuid);
571 * This method is called when an incoming /lock/locked message is received
572 * (this is a callback to an earlier requestor that the lock is now
575 * @param key string key identifying the lock
576 * @param ownerKey string key identifying the owner, which must hash to
577 * a bucket owned by the current host (it is typically a 'RequestID')
578 * @param uuid the UUID that uniquely identifies the original 'TargetLock'
579 * @param ttl similar to IP time-to-live -- it controls the number of hops
580 * the message may take
581 * @return the Response that should be passed back to the HTTP request
583 static Response incomingLocked(String key, String ownerKey, UUID uuid, int ttl) {
584 if (!Bucket.isKeyOnThisServer(ownerKey)) {
585 // this is the wrong server -- forward to the correct one
586 // (we can use this thread)
589 Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
590 if (server != null) {
591 WebTarget webTarget = server.getWebTarget("lock/locked");
592 if (webTarget != null) {
593 logger.warn("Forwarding 'lock/locked' to uuid {} "
594 + "(key={},owner={},uuid={},ttl={})",
595 server.getUuid(), key, ownerKey, uuid, ttl);
597 .queryParam(QP_KEY, key)
598 .queryParam(QP_OWNER, ownerKey)
599 .queryParam(QP_UUID, uuid.toString())
600 .queryParam(QP_TTL, String.valueOf(ttl))
606 // if we reach this point, we didn't forward for some reason --
607 // return failure by indicating it is locked and unavailable
608 logger.error("Couldn't forward 'lock/locked' "
609 + "(key={},owner={},uuid={},ttl={})",
610 key, ownerKey, uuid, ttl);
611 return Response.noContent().status(LOCKED).build();
614 TargetLock targetLock = null;
615 LocalLocks localLocks = LocalLocks.get(ownerKey);
616 synchronized (localLocks) {
617 WeakReference<TargetLock> wr =
618 localLocks.uuidToWeakReference.get(uuid);
621 targetLock = wr.get();
622 if (targetLock == null) {
623 // lock has been abandoned
624 // (AbandonedHandler should usually find this first)
625 localLocks.weakReferenceToIdentity.remove(wr);
626 localLocks.uuidToWeakReference.remove(uuid);
628 // the lock has been made available -- update the state
629 // TBD: This could be outside of 'synchronized (localLocks)'
630 synchronized (targetLock) {
631 if (targetLock.state == State.WAITING) {
632 targetLock.state = State.ACTIVE;
634 // will return a failure -- not sure how this happened
635 logger.error("incomingLocked: {} is in state {}",
636 targetLock, targetLock.state);
642 // clean up what we can
643 localLocks.uuidToWeakReference.remove(uuid);
646 if (targetLock == null) {
647 // We can't locate the target lock
648 // TBD: This probably isn't the best error code to use
649 return Response.noContent().status(LOCKED).build();
651 targetLock.context.lockAvailable(targetLock);
652 return Response.noContent().build();
657 * This is called when the state of a bucket has changed, but is currently
658 * stable. Note that this method is called while being synchronized on the
661 * @param bucket the bucket to audit
662 * @param owner 'true' if the current host owns the bucket
663 * @param backup 'true' if the current host is a backup for the bucket
665 static void auditBucket(Bucket bucket, boolean isOwner, boolean isBackup) {
667 // we should not have any 'TargetLock' adjuncts
668 if (bucket.removeAdjunct(LocalLocks.class) != null) {
669 logger.warn("Bucket {}: Removed superfluous "
670 + "'TargetLock.LocalLocks' adjunct",
673 if (bucket.removeAdjunct(GlobalLocks.class) != null) {
674 logger.warn("Bucket {}: Removed superfluous "
675 + "'TargetLock.GlobalLocks' adjunct",
685 public String toString() {
686 return "TargetLock(key=" + identity.key
687 + ", ownerKey=" + identity.ownerKey
688 + ", uuid=" + identity.uuid
689 + ", state=" + state + ")";
697 * This method modifies the behavior of 'TargetLock' deserialization by
698 * creating the corresponding 'LocalLocks' entries.
700 private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
701 in.defaultReadObject();
702 if (state == State.ACTIVE || state == State.WAITING) {
703 // need to build entries in 'LocalLocks'
704 LocalLocks localLocks = LocalLocks.get(identity.ownerKey);
705 WeakReference<TargetLock> wr = new WeakReference<>(this, abandoned);
707 synchronized (localLocks) {
708 localLocks.weakReferenceToIdentity.put(wr, identity);
709 localLocks.uuidToWeakReference.put(identity.uuid, wr);
714 /* ============================================================ */
716 private static class LockFactory implements PolicyResourceLockManager {
717 /*****************************************/
718 /* 'PolicyResourceLockManager' interface */
719 /*****************************************/
725 public Lock createLock(String resourceId, String ownerKey,
726 int holdSec, LockCallback callback,
727 boolean waitForLock) {
728 // 'holdSec' isn't implemented yet
729 return new TargetLock(resourceId, ownerKey, callback, waitForLock);
732 /*************************/
733 /* 'Startable' interface */
734 /*************************/
740 public boolean start() {
748 public boolean stop() {
756 public void shutdown() {
757 // nothing needs to be done
764 public boolean isAlive() {
768 /************************/
769 /* 'Lockable' interface */
770 /************************/
776 public boolean lock() {
784 public boolean unlock() {
792 public boolean isLocked() {
797 private static LockFactory lockFactory = new LockFactory();
799 public static PolicyResourceLockManager getLockFactory() {
803 /* ============================================================ */
806 * There is a single instance of class 'TargetLock.EventHandler', which is
807 * registered to listen for notifications of state transitions.
809 private static class EventHandler implements Events {
814 public void newServer(Server server) {
815 // with an additional server, the offset within the audit period changes
816 Audit.scheduleAudit();
823 public void serverFailed(Server server) {
824 // when one less server, the offset within the audit period changes
825 Audit.scheduleAudit();
829 /* ============================================================ */
832 * This class usually has a one-to-one correspondence with a 'TargetLock'
833 * instance, unless the 'TargetLock' has been abandoned.
835 private static class Identity implements Serializable {
836 // this is the key associated with the lock
839 // this is the key associated with the lock requestor
842 // this is a unique identifier assigned to the 'TargetLock'
846 * Constructor - initializes the 'Identity' instance, including the
847 * generation of the unique identifier.
849 * @param key string key identifying the lock
850 * @param ownerKey string key identifying the owner, which must hash to
851 * a bucket owned by the current host (it is typically a 'RequestID')
853 private Identity(String key, String ownerKey) {
855 this.ownerKey = ownerKey;
856 this.uuid = UUID.randomUUID();
860 * Constructor - initializes the 'Identity' instance, with the 'uuid'
861 * value passed at initialization time (only used for auditing).
863 * @param key string key identifying the lock
864 * @param ownerKey string key identifying the owner, which must hash to
865 * @param uuid the UUID that uniquely identifies the original 'TargetLock'
867 private Identity(String key, String ownerKey, UUID uuid) {
869 this.ownerKey = ownerKey;
874 * Free the lock associated with this 'Identity' instance.
876 * @return 'false' if the 'LocalLocks' data is not there, true' if it is
878 private boolean free() {
880 Bucket.forwardAndProcess(key, new Bucket.Message() {
885 public void process() {
886 // the global lock entry is also on this server
887 GlobalLocks.get(key).unlock(key, uuid);
894 public void sendToServer(Server server, int bucketNumber) {
895 logger.info("Sending free request to {}: key={}, owner={}, uuid={}",
896 server, key, ownerKey, uuid);
897 server.post("lock/free", null, new Server.PostResponse() {
899 public WebTarget webTarget(WebTarget webTarget) {
901 .queryParam(QP_KEY, key)
902 .queryParam(QP_OWNER, ownerKey)
903 .queryParam(QP_UUID, uuid.toString())
904 .queryParam(QP_TTL, timeToLive);
908 public void response(Response response) {
909 logger.info("Free response={} (code={})",
910 response, response.getStatus());
911 switch (response.getStatus()) {
913 // free successful -- don't need to do anything
918 logger.error("TargetLock free failed, "
919 + "key={}, owner={}, uuid={}",
920 key, ownerKey, uuid);
924 logger.error("Unknown status: {}", response.getStatus());
932 // clean up locallocks entry
933 LocalLocks localLocks = LocalLocks.get(ownerKey);
934 synchronized (localLocks) {
935 WeakReference<TargetLock> wr =
936 localLocks.uuidToWeakReference.get(uuid);
941 localLocks.weakReferenceToIdentity.remove(wr);
942 localLocks.uuidToWeakReference.remove(uuid);
948 /***************************/
949 /* 'Object' class override */
950 /***************************/
956 public boolean equals(Object other) {
957 if (other instanceof Identity) {
958 Identity identity = (Identity)other;
959 return uuid.equals(identity.uuid)
960 && key.equals(identity.key)
961 && ownerKey.equals(identity.ownerKey);
967 /* ============================================================ */
970 * An instance of this class is used for 'TargetLock.context' when the
971 * lock is allocated within a Drools session. Its purpose is to ensure that
972 * the callback to 'TargetLock.owner' runs within the Drools thread.
974 private static class PolicySessionContext implements LockCallback, Serializable {
975 // the 'PolicySession' instance in question
976 PolicySession policySession;
979 * Constructor - initialize the 'policySession' field.
981 * @param policySession the Drools session
983 private PolicySessionContext(PolicySession policySession) {
984 this.policySession = policySession;
987 /*********************/
988 /* 'Owner' interface */
989 /*********************/
995 public void lockAvailable(final Lock lock) {
996 // Run 'owner.lockAvailable' within the Drools session
997 if (policySession != null) {
998 DroolsRunnable callback = () -> {
999 ((TargetLock)lock).owner.lockAvailable(lock);
1001 policySession.getKieSession().insert(callback);
1009 public void lockUnavailable(Lock lock) {
1010 // Run 'owner.unlockAvailable' within the Drools session
1011 if (policySession != null) {
1012 DroolsRunnable callback = () -> {
1013 ((TargetLock)lock).owner.lockUnavailable(lock);
1015 policySession.getKieSession().insert(callback);
1024 * Specializes serialization of 'PolicySessionContext'.
1026 private void writeObject(ObjectOutputStream out) throws IOException {
1027 // 'PolicySession' can't be serialized directly --
1028 // store as 'groupId', 'artifactId', 'sessionName'
1029 PolicyContainer pc = policySession.getPolicyContainer();
1031 out.writeObject(pc.getGroupId());
1032 out.writeObject(pc.getArtifactId());
1033 out.writeObject(policySession.getName());
1037 * Specializes deserialization of 'PolicySessionContext'.
1039 private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
1040 // 'PolicySession' can't be serialized directly --
1041 // read in 'groupId', 'artifactId', 'sessionName'
1042 String groupId = String.class.cast(in.readObject());
1043 String artifactId = String.class.cast(in.readObject());
1044 String sessionName = String.class.cast(in.readObject());
1046 // locate the 'PolicySession' associated with
1047 // 'groupId', 'artifactId', and 'sessionName'
1048 for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
1049 if (artifactId.equals(pc.getArtifactId())
1050 && groupId.equals(pc.getGroupId())) {
1051 // found 'PolicyContainer' -- look up the session
1052 policySession = pc.getPolicySession(sessionName);
1053 if (policySession == null) {
1054 logger.error("TargetLock.PolicySessionContext.readObject: "
1055 + "Can't find session {}:{}:{}",
1056 groupId, artifactId, sessionName);
1063 /* ============================================================ */
1066 * This class contains two tables that have entries for any 'TargetLock'
1067 * in the 'ACTIVE' or 'WAITING' state. This is the "client" end of the
1068 * lock implementation.
1070 static class LocalLocks {
1071 // this table makes it easier to clean up locks that have been
1072 // abandoned (see 'AbandonedHandler')
1073 private Map<WeakReference<TargetLock>, Identity> weakReferenceToIdentity = new IdentityHashMap<>();
1075 // this table is used to locate a 'TargetLock' instance from a UUID
1076 private Map<UUID, WeakReference<TargetLock>> uuidToWeakReference =
1080 * Fetch the 'LocalLocks' entry associated with a particular owner key
1081 * (it is created if necessary).
1083 * @param ownerKey string key identifying the owner, which must hash to
1084 * a bucket owned by the current host (it is typically a 'RequestID')
1085 * @return the associated 'LocalLocks' instance (it should never be 'null')
1087 private static LocalLocks get(String ownerKey) {
1088 return Bucket.getBucket(ownerKey).getAdjunct(LocalLocks.class);
1092 /* ============================================================ */
1095 * This class contains the actual lock table, which is the "server" end
1096 * of the lock implementation.
1098 public static class GlobalLocks implements Serializable {
1099 // this is the lock table, mapping 'key' to 'LockEntry', which indicates
1100 // the current lock holder, and all those waiting
1101 private Map<String, LockEntry> keyToEntry = new HashMap<>();
1104 * Fetch the 'GlobalLocks' entry associated with a particular key
1105 * (it is created if necessary).
1107 * @param key string key identifying the lock
1108 * @return the associated 'GlobalLocks' instance
1109 * (it should never be 'null')
1111 private static GlobalLocks get(String key) {
1112 return Bucket.getBucket(key).getAdjunct(GlobalLocks.class);
1116 * Do the 'lock' operation -- lock immediately, if possible. If not,
1117 * get on the waiting list, if requested.
1119 * @param key string key identifying the lock, which must hash to a bucket
1120 * owned by the current host
1121 * @param ownerKey string key identifying the owner
1122 * @param uuid the UUID that uniquely identifies the original 'TargetLock'
1123 * (on the originating host)
1124 * @param waitForLock this controls the behavior when 'key' is already
1125 * locked - 'true' means wait for it to be freed, 'false' means fail
1126 * @return the lock State corresponding to the current request
1128 synchronized State lock(String key, String ownerKey, UUID uuid, boolean waitForLock) {
1129 synchronized (keyToEntry) {
1130 LockEntry entry = keyToEntry.get(key);
1131 if (entry == null) {
1132 // there is no existing entry -- create one, and return ACTIVE
1133 entry = new LockEntry(key, ownerKey, uuid);
1134 keyToEntry.put(key, entry);
1136 return State.ACTIVE;
1139 // the requestor is willing to wait -- get on the waiting list,
1140 // and return WAITING
1141 entry.waitingList.add(new Waiting(ownerKey, uuid));
1143 return State.WAITING;
1146 // the requestor is not willing to wait -- return FREE,
1147 // which will be interpreted as a failure
1153 * Free a lock or a pending lock request.
1155 * @param key string key identifying the lock
1156 * @param uuid the UUID that uniquely identifies the original 'TargetLock'
1158 synchronized void unlock(String key, UUID uuid) {
1159 synchronized (keyToEntry) {
1160 final LockEntry entry = keyToEntry.get(key);
1161 if (entry == null) {
1162 logger.error("GlobalLocks.unlock: unknown lock, key={}, uuid={}",
1166 if (entry.currentOwnerUuid.equals(uuid)) {
1167 // this is the current lock holder
1168 if (entry.waitingList.isEmpty()) {
1170 keyToEntry.remove(key);
1172 // pass it on to the next one in the list
1173 Waiting waiting = entry.waitingList.remove();
1174 entry.currentOwnerKey = waiting.ownerKey;
1175 entry.currentOwnerUuid = waiting.ownerUuid;
1177 entry.notifyNewOwner(this);
1181 // see if one of the waiting entries is being freed
1182 for (Waiting waiting : entry.waitingList) {
1183 if (waiting.ownerUuid.equals(uuid)) {
1184 entry.waitingList.remove(waiting);
1194 * Notify all features that an update has occurred on this GlobalLock.
1196 * @param key the key associated with the change
1197 * (used to locate the bucket)
1199 private void sendUpdate(String key) {
1200 Bucket bucket = Bucket.getBucket(key);
1201 for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
1202 feature.lockUpdate(bucket, this);
1210 private void writeObject(ObjectOutputStream out) throws IOException {
1211 synchronized (this) {
1212 out.defaultWriteObject();
1217 /* ============================================================ */
1220 * Each instance of this object corresponds to a single key in the lock
1221 * table. It includes the current holder of the lock, as well as
1222 * any that are waiting.
1224 private static class LockEntry implements Serializable {
1225 // string key identifying the lock
1228 // string key identifying the owner
1229 private String currentOwnerKey;
1231 // UUID identifying the original 'TargetLock
1232 private UUID currentOwnerUuid;
1234 // list of pending lock requests for this key
1235 private Queue<Waiting> waitingList = new LinkedList<>();
1238 * Constructor - initialize the 'LockEntry'.
1240 * @param key string key identifying the lock, which must hash to a bucket
1241 * owned by the current host
1242 * @param ownerKey string key identifying the owner
1243 * @param uuid the UUID that uniquely identifies the original 'TargetLock'
1245 private LockEntry(String key, String ownerKey, UUID uuid) {
1247 this.currentOwnerKey = ownerKey;
1248 this.currentOwnerUuid = uuid;
1252 * This method is called after the 'currentOwnerKey' and
1253 * 'currentOwnerUuid' fields have been updated, and it notifies the new
1254 * owner that they now have the lock.
1256 * @param globalLocks the 'GlobalLocks' instance containing this entry
1258 private void notifyNewOwner(final GlobalLocks globalLocks) {
1259 Bucket.forwardAndProcess(currentOwnerKey, new Bucket.Message() {
1264 public void process() {
1265 // the new owner is on this host
1266 incomingLocked(key, currentOwnerKey, currentOwnerUuid, 1);
1273 public void sendToServer(Server server, int bucketNumber) {
1274 // the new owner is on a remote host
1275 logger.info("Sending locked notification to {}: key={}, owner={}, uuid={}",
1276 server, key, currentOwnerKey, currentOwnerUuid);
1277 server.post("lock/locked", null, new Server.PostResponse() {
1279 public WebTarget webTarget(WebTarget webTarget) {
1281 .queryParam(QP_KEY, key)
1282 .queryParam(QP_OWNER, currentOwnerKey)
1283 .queryParam(QP_UUID, currentOwnerUuid.toString())
1284 .queryParam(QP_TTL, timeToLive);
1288 public void response(Response response) {
1289 logger.info("Locked response={} (code={})",
1290 response, response.getStatus());
1291 if (response.getStatus() != NO_CONTENT) {
1292 // notification failed -- free this one
1293 globalLocks.unlock(key, currentOwnerUuid);
1303 /* ============================================================ */
1306 * This corresponds to a member of 'LockEntry.waitingList'
1308 private static class Waiting implements Serializable {
1309 // string key identifying the owner
1312 // uniquely identifies the new owner 'TargetLock'
1318 * @param ownerKey string key identifying the owner
1319 * @param ownerUuid uniquely identifies the new owner 'TargetLock'
1321 private Waiting(String ownerKey, UUID ownerUuid) {
1322 this.ownerKey = ownerKey;
1323 this.ownerUuid = ownerUuid;
1327 /* ============================================================ */
1330 * Backup data associated with a 'GlobalLocks' instance.
1332 static class LockBackup implements Bucket.Backup {
1337 public Bucket.Restore generate(int bucketNumber) {
1338 Bucket bucket = Bucket.getBucket(bucketNumber);
1340 // just remove 'LocalLocks' -- it will need to be rebuilt from
1341 // 'TargetLock' instances
1342 bucket.removeAdjunct(LocalLocks.class);
1344 // global locks need to be transferred
1345 GlobalLocks globalLocks = bucket.removeAdjunct(GlobalLocks.class);
1346 return globalLocks == null ? null : new LockRestore(globalLocks);
1350 /* ============================================================ */
1353 * This class is used to restore a 'GlobalLocks' instance from a backup.
1355 static class LockRestore implements Bucket.Restore, Serializable {
1356 GlobalLocks globalLocks;
1359 * Constructor - runs as part of backup (deserialization bypasses this constructor).
1361 * @param globalLocks GlobalLocks instance extracted as part of backup
1363 LockRestore(GlobalLocks globalLocks) {
1364 this.globalLocks = globalLocks;
1371 public void restore(int bucketNumber) {
1373 Bucket bucket = Bucket.getBucket(bucketNumber);
1375 // update the adjunct
1376 if (bucket.putAdjunct(globalLocks) != null) {
1377 logger.error("LockRestore({}): GlobalLocks adjunct already existed",
1381 // notify features of the 'globalLocks' update
1382 for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
1383 feature.lockUpdate(bucket, globalLocks);
1388 /* ============================================================ */
1391 * This class is a deamon that monitors the 'abandoned' queue. If an
1392 * ACTIVE 'TargetLock' is abandoned, the GC will eventually place the
1393 * now-empty 'WeakReference' in this queue.
1395 private static class AbandonedHandler extends Thread {
1396 AbandonedHandler() {
1397 super("TargetLock.AbandonedHandler");
1401 * This method camps on the 'abandoned' queue, processing entries as
1402 * they are received.
1406 while (abandonedHandler != null) {
1408 Reference<? extends TargetLock> wr = abandoned.remove();
1410 // At this point, we know that 'ref' is a
1411 // 'WeakReference<TargetLock>' instance that has been abandoned,
1412 // but we don't know what the associated 'Identity' instance
1413 // is. Here, we search through every bucket looking for a
1414 // matching entry. The assumption is that this is rare enough,
1415 // and due to a bug, so it doesn't hurt to spend extra CPU time
1416 // here. The alternative is to add some additional information
1417 // to make this mapping quick, at the expense of a slight
1418 // slow down of normal lock operations.
1419 for (int i = 0 ; i < Bucket.BUCKETCOUNT ; i += 1) {
1420 LocalLocks localLocks =
1421 Bucket.getBucket(i).getAdjunctDontCreate(LocalLocks.class);
1422 if (localLocks != null) {
1423 // the adjunct does exist -- see if the WeakReference
1424 // instance is known to this bucket
1425 synchronized (localLocks) {
1427 localLocks.weakReferenceToIdentity.get(wr);
1428 if (identity != null) {
1430 logger.error("Abandoned TargetLock: bucket={}, "
1431 + "key={}, ownerKey={}, uuid={}",
1432 i, identity.key, identity.ownerKey,
1440 } catch (Exception e) {
1441 logger.error("TargetLock.AbandonedHandler exception", e);
1447 // create a single instance of 'AbandonedHandler', and start it
1448 private static AbandonedHandler abandonedHandler = new AbandonedHandler();
1451 abandonedHandler.start();
1454 /* ============================================================ */
1457 * This class handles the '/cmd/dumpLocks' REST command.
1459 static class DumpLocks {
1460 // indicates whether a more detailed dump should be done
1461 private boolean detail;
1463 // this table maps the 'TargetLock' UUID into an object containing
1464 // both client (LocalLocks) and server (GlobalLocks) information
1465 private Map<UUID, MergedData> mergedDataMap =
1466 new TreeMap<>(Util.uuidComparator);
1468 // this table maps the 'TargetLock' key into the associated 'LockEntry'
1470 private Map<String, LockEntry> lockEntries = new TreeMap<>();
1472 // this table maps the 'TargetLock' key into entries that only exist
1473 // on the client end
1474 private Map<String, MergedData> clientOnlyEntries = new TreeMap<>();
1476 // display format (although it is now dynamically adjusted)
1477 private String format = "%-14s %-14s %-36s %-10s %s\n";
1479 // calculation of maximum key length for display
1480 private int keyLength = 10;
1482 // calculation of maximum owner key length for display
1483 private int ownerKeyLength = 10;
1485 // 'true' if any comments need to be displayed (affects format)
1486 private boolean commentsIncluded = false;
1489 * Entry point for the '/cmd/dumpLocks' REST command.
1491 * @param out where the output should be displayed
1492 * @param detail 'true' provides additional bucket and host information
1493 * (but abbreviates all UUIDs in order to avoid excessive
1496 static void dumpLocks(PrintStream out, boolean detail)
1497 throws InterruptedException, IOException, ClassNotFoundException {
1499 // the actual work is done in the constructor
1500 new DumpLocks(out, detail);
1504 * Entry point for the '/lock/dumpLocksData' REST command, which generates
1505 * a byte stream for this particular host.
1507 * @param serverUuid the UUID of the intended destination server
1508 * @param ttl similar to IP time-to-live -- it controls the number of hops
1509 * the message may take
1510 * @return a base64-encoded byte stream containing serialized 'HostData'
1512 static byte[] dumpLocksData(UUID serverUuid, int ttl) throws IOException {
1513 if (!Server.getThisServer().getUuid().equals(serverUuid)) {
1516 Server server = Server.getServer(serverUuid);
1517 if (server != null) {
1518 WebTarget webTarget =
1519 server.getWebTarget("lock/dumpLocksData");
1520 if (webTarget != null) {
1521 logger.info("Forwarding 'lock/dumpLocksData' to uuid {}",
1524 .queryParam(QP_SERVER, serverUuid.toString())
1525 .queryParam(QP_TTL, String.valueOf(ttl))
1526 .request().get(byte[].class);
1531 // if we reach this point, we didn't forward for some reason
1533 logger.error("Couldn't forward 'lock/dumpLocksData to uuid {}",
1538 return Base64.getEncoder().encode(Util.serialize(new HostData()));
1542 * Constructor - does the '/cmd/dumpLocks' REST command.
1544 * @param out where the output should be displayed
1546 DumpLocks(PrintStream out, boolean detail)
1547 throws IOException, InterruptedException, ClassNotFoundException {
1549 this.detail = detail;
1551 // receives responses from '/lock/dumpLocksData'
1552 final LinkedTransferQueue<Response> responseQueue =
1553 new LinkedTransferQueue<>();
1555 // generate a count of the number of external servers that should respond
1556 int pendingResponseCount = 0;
1558 // iterate over all of the servers
1559 for (final Server server : Server.getServers()) {
1560 if (server == Server.getThisServer()) {
1561 // skip this server -- we will handle it differently
1565 // keep a running count
1566 pendingResponseCount += 1;
1567 server.post("lock/dumpLocksData", null, new Server.PostResponse() {
1569 public WebTarget webTarget(WebTarget webTarget) {
1571 .queryParam(QP_SERVER, server.getUuid().toString())
1572 .queryParam(QP_TTL, timeToLive);
1576 public void response(Response response) {
1577 // responses are queued, and the main thread will collect them
1578 responseQueue.put(response);
1583 // this handles data associated with this server -- it also goes through
1584 // serialization/deserialization, which provides a deep copy of the data
1585 populateLockData(dumpLocksData(Server.getThisServer().getUuid(), 0));
1587 // now, poll for responses from all of the the other servers
1588 while (pendingResponseCount > 0) {
1589 pendingResponseCount -= 1;
1590 Response response = responseQueue.poll(60, TimeUnit.SECONDS);
1591 if (response == null) {
1592 // timeout -- we aren't expecting any more responses
1596 // populate data associated with this server
1597 populateLockData(response.readEntity(byte[].class));
1600 // we have processed all of the servers that we are going to,
1601 // now generate the output
1606 * process base64-encoded data from a server (local or remote).
1608 * @param data base64-encoded data (class 'HostData')
1610 void populateLockData(byte[] data) throws IOException, ClassNotFoundException {
1611 Object decodedData = Util.deserialize(Base64.getDecoder().decode(data));
1612 if (decodedData instanceof HostData) {
1613 // deserialized data
1614 HostData hostData = (HostData)decodedData;
1616 // fetch 'Server' instance associated with the responding server
1617 Server server = Server.getServer(hostData.hostUuid);
1619 // process the client-end data
1620 for (ClientData clientData : hostData.clientDataList) {
1621 populateLockData_clientData(clientData, server);
1624 // process the server-end data
1625 for (ServerData serverData : hostData.serverDataList) {
1626 populateLockData_serverData(serverData, server);
1629 logger.error("TargetLock.DumpLocks.populateLockData: "
1630 + "received data has class {}",
1631 decodedData.getClass().getName());
1635 private void populateLockData_clientData(ClientData clientData, Server server) {
1636 // 'true' if the bucket associated with this 'ClientData'
1637 // doesn't belong to the remote server, as far as we can tell
1638 boolean serverMismatch =
1639 Bucket.bucketToServer(clientData.bucketNumber) != server;
1641 // each 'ClientDataRecord' instance corresponds to an
1642 // active 'Identity' (TargetLock) instance
1643 for (ClientDataRecord cdr : clientData.clientDataRecords) {
1644 // update maximum 'key' and 'ownerKey' lengths
1645 updateKeyLength(cdr.identity.key);
1646 updateOwnerKeyLength(cdr.identity.ownerKey);
1649 UUID uuid = cdr.identity.uuid;
1651 // fetch/generate 'MergeData' instance for this UUID
1652 MergedData md = mergedDataMap.get(uuid);
1654 md = new MergedData(uuid);
1655 mergedDataMap.put(uuid, md);
1658 // update 'MergedData.clientDataRecord'
1659 if (md.clientDataRecord == null) {
1660 md.clientDataRecord = cdr;
1662 md.comment("Duplicate client entry for UUID");
1665 if (serverMismatch) {
1666 // need to generate an additional error
1667 md.comment(server.toString()
1668 + "(client) does not own bucket "
1669 + clientData.bucketNumber);
1674 private void populateLockData_serverData(ServerData serverData, Server server) {
1675 // 'true' if the bucket associated with this 'ServerData'
1676 // doesn't belong to the remote server, as far as we can tell
1677 boolean serverMismatch =
1678 Bucket.bucketToServer(serverData.bucketNumber) != server;
1680 // each 'LockEntry' instance corresponds to the current holder
1681 // of a lock, and all requestors waiting for it to be freed
1682 for (LockEntry le : serverData.globalLocks.keyToEntry.values()) {
1683 // update maximum 'key' and 'ownerKey' lengths
1684 updateKeyLength(le.key);
1685 updateOwnerKeyLength(le.currentOwnerKey);
1688 UUID uuid = le.currentOwnerUuid;
1690 // fetch/generate 'MergeData' instance for this UUID
1691 MergedData md = mergedDataMap.get(uuid);
1693 md = new MergedData(uuid);
1694 mergedDataMap.put(uuid, md);
1697 // update 'lockEntries' table entry
1698 if (lockEntries.get(le.key) != null) {
1699 md.comment("Duplicate server entry for key " + le.key);
1701 lockEntries.put(le.key, le);
1704 // update 'MergedData.serverLockEntry'
1705 // (leave 'MergedData.serverWaiting' as 'null', because
1706 // this field is only used for waiting entries)
1707 if (md.serverLockEntry == null) {
1708 md.serverLockEntry = le;
1710 md.comment("Duplicate server entry for UUID");
1713 if (serverMismatch) {
1714 // need to generate an additional error
1715 md.comment(server.toString()
1716 + "(server) does not own bucket "
1717 + serverData.bucketNumber);
1720 // we need 'MergeData' entries for all waiting requests
1721 for (Waiting waiting : le.waitingList) {
1722 populateLockData_serverData_waiting(
1723 serverData, server, serverMismatch, le, waiting);
1728 private void populateLockData_serverData_waiting(
1729 ServerData serverData, Server server, boolean serverMismatch,
1730 LockEntry le, Waiting waiting) {
1732 // update maximum 'ownerKey' length
1733 updateOwnerKeyLength(waiting.ownerKey);
1736 UUID uuid = waiting.ownerUuid;
1738 // fetch/generate 'MergeData' instance for this UUID
1739 MergedData md = mergedDataMap.get(uuid);
1741 md = new MergedData(uuid);
1742 mergedDataMap.put(uuid, md);
1745 // update 'MergedData.serverLockEntry' and
1746 // 'MergedData.serverWaiting'
1747 if (md.serverLockEntry == null) {
1748 md.serverLockEntry = le;
1749 md.serverWaiting = waiting;
1751 md.comment("Duplicate server entry for UUID");
1754 if (serverMismatch) {
1755 // need to generate an additional error
1756 md.comment(server.toString()
1757 + "(server) does not own bucket "
1758 + serverData.bucketNumber);
1763 * Do some additional sanity checks on the 'MergedData', and then
1764 * display all of the results.
1766 * @param out where the output should be displayed
1768 void dump(PrintStream out) {
1769 // iterate over the 'MergedData' instances looking for problems
1770 for (MergedData md : mergedDataMap.values()) {
1771 if (md.clientDataRecord == null) {
1772 md.comment("Client data missing");
1773 } else if (md.serverLockEntry == null) {
1774 md.comment("Server data missing");
1775 clientOnlyEntries.put(md.clientDataRecord.identity.key, md);
1776 } else if (!md.clientDataRecord.identity.key.equals(md.serverLockEntry.key)) {
1777 md.comment("Client key(" + md.clientDataRecord.identity.key
1778 + ") server key(" + md.serverLockEntry.key
1781 String serverOwnerKey = (md.serverWaiting == null
1782 ? md.serverLockEntry.currentOwnerKey : md.serverWaiting.ownerKey);
1783 if (!md.clientDataRecord.identity.ownerKey.equals(serverOwnerKey)) {
1784 md.comment("Client owner key("
1785 + md.clientDataRecord.identity.ownerKey
1786 + ") server owner key(" + serverOwnerKey
1789 // TBD: test for state mismatch
1794 // generate format based upon the maximum key length, maximum
1795 // owner key length, and whether comments are included anywhere
1796 format = "%-" + keyLength + "s %6s %-9s %-" + ownerKeyLength
1797 + "s %6s %-9s %-9s %-10s" + (commentsIncluded ? " %s\n" : "\n");
1799 // dump out the header
1800 out.printf(format, "Key", "Bucket", "Host UUID",
1801 "Owner Key", "Bucket", "Host UUID",
1802 "Lock UUID", "State", "Comments");
1803 out.printf(format, "---", "------", "---------",
1804 "---------", "------", "---------",
1805 "---------", "-----", "--------");
1807 // generate format based upon the maximum key length, maximum
1808 // owner key length, and whether comments are included anywhere
1809 format = "%-" + keyLength + "s %-" + ownerKeyLength
1810 + "s %-36s %-10s" + (commentsIncluded ? " %s\n" : "\n");
1812 // dump out the header
1813 out.printf(format, "Key", "Owner Key", "UUID", "State", "Comments");
1814 out.printf(format, "---", "---------", "----", "-----", "--------");
1817 dump_serverTable(out);
1818 dump_clientOnlyEntries(out);
1821 private void dump_serverTable(PrintStream out) {
1822 // iterate over the server table
1823 for (LockEntry le : lockEntries.values()) {
1824 // fetch merged data
1825 MergedData md = mergedDataMap.get(le.currentOwnerUuid);
1827 // dump out record associated with lock owner
1830 le.key, getBucket(le.key), bucketOwnerUuid(le.key),
1831 le.currentOwnerKey, getBucket(le.currentOwnerKey),
1832 bucketOwnerUuid(le.currentOwnerKey),
1833 abbrevUuid(le.currentOwnerUuid),
1834 md.getState(), md.firstComment());
1837 le.key, le.currentOwnerKey, le.currentOwnerUuid,
1838 md.getState(), md.firstComment());
1840 dumpMoreComments(out, md);
1842 // iterate over all requests waiting for this lock
1843 for (Waiting waiting: le.waitingList) {
1844 // fetch merged data
1845 md = mergedDataMap.get(waiting.ownerUuid);
1847 // dump out record associated with waiting request
1851 waiting.ownerKey, getBucket(waiting.ownerKey),
1852 bucketOwnerUuid(waiting.ownerKey),
1853 abbrevUuid(waiting.ownerUuid),
1854 md.getState(), md.firstComment());
1856 out.printf(format, "", waiting.ownerKey, waiting.ownerUuid,
1857 md.getState(), md.firstComment());
1859 dumpMoreComments(out, md);
1864 private void dump_clientOnlyEntries(PrintStream out) {
1865 // client records that don't have matching server entries
1866 for (MergedData md : clientOnlyEntries.values()) {
1867 ClientDataRecord cdr = md.clientDataRecord;
1870 cdr.identity.key, getBucket(cdr.identity.key),
1871 bucketOwnerUuid(cdr.identity.key),
1872 cdr.identity.ownerKey,
1873 getBucket(cdr.identity.ownerKey),
1874 bucketOwnerUuid(cdr.identity.ownerKey),
1875 abbrevUuid(cdr.identity.uuid),
1876 md.getState(), md.firstComment());
1878 out.printf(format, cdr.identity.key, cdr.identity.ownerKey,
1879 cdr.identity.uuid, md.getState(), md.firstComment());
1881 dumpMoreComments(out, md);
1886 * This method converts a String keyword into the corresponding bucket
1889 * @param key the keyword to be converted
1890 * @return the bucket number
1892 private static int getBucket(String key) {
1893 return Bucket.bucketNumber(key);
1897 * Determine the abbreviated UUID associated with a key.
1899 * @param key the keyword to be converted
1900 * @return the abbreviated UUID of the bucket owner
1902 private static String bucketOwnerUuid(String key) {
1904 Bucket bucket = Bucket.getBucket(Bucket.bucketNumber(key));
1906 // fetch the bucket owner (may be 'null' if unassigned)
1907 Server owner = bucket.getOwner();
1909 return owner == null ? "NONE" : abbrevUuid(owner.getUuid());
1913 * Convert a UUID to an abbreviated form, which is the
1914 * first 8 hex digits of the UUID, followed by the character '*'.
1916 * @param uuid the UUID to convert
1917 * @return the abbreviated form
1919 private static String abbrevUuid(UUID uuid) {
1920 return uuid.toString().substring(0, 8) + "*";
1924 * If the 'MergedData' instance has more than one comment,
1925 * dump out comments 2-n.
1927 * @param out where the output should be displayed
1928 * @param md the MergedData instance
1930 void dumpMoreComments(PrintStream out, MergedData md) {
1931 if (md.comments.size() > 1) {
1932 Queue<String> comments = new LinkedList<>(md.comments);
1934 // remove the first entry, because it has already been displayed
1936 for (String comment : comments) {
1938 out.printf(format, "", "", "", "", "", "", "", "", comment);
1940 out.printf(format, "", "", "", "", comment);
1947 * Check the length of the specified 'key', and update 'keyLength' if
1948 * it exceeds the current maximum.
1950 * @param key the key to be tested
1952 void updateKeyLength(String key) {
1953 int length = key.length();
1954 if (length > keyLength) {
1960 * Check the length of the specified 'ownerKey', and update
1961 * 'ownerKeyLength' if it exceeds the current maximum.
1963 * @param ownerKey the owner key to be tested
1965 void updateOwnerKeyLength(String ownerKey) {
1966 int length = ownerKey.length();
1967 if (length > ownerKeyLength) {
1968 ownerKeyLength = length;
1972 /* ============================== */
1975 * Each instance of this class corresponds to client and/or server
1976 * data structures, and is used to check consistency between the two.
1979 // the client/server UUID
1982 // client-side data (from LocalLocks)
1983 ClientDataRecord clientDataRecord = null;
1985 // server-side data (from GlobalLocks)
1986 LockEntry serverLockEntry = null;
1987 Waiting serverWaiting = null;
1989 // detected problems, such as server/client mismatches
1990 Queue<String> comments = new LinkedList<String>();
1993 * Constructor - initialize the 'uuid'.
1995 * @param uuid the UUID that identifies the original 'TargetLock'
1997 MergedData(UUID uuid) {
2002 * add a comment to the list, and indicate that there are now
2005 * @param co the comment to add
2007 void comment(String co) {
2009 commentsIncluded = true;
2013 * Return the first comment, or an empty string if there are no
2016 * @return the first comment, or an empty string if there are no
2017 * comments (useful for formatting output).
2019 String firstComment() {
2020 return comments.isEmpty() ? "" : comments.poll();
2024 * Return a string description of the state.
2026 * @return a string description of the state.
2029 return clientDataRecord == null
2030 ? "MISSING" : clientDataRecord.state.toString();
2035 * This class contains all of the data sent from each host to the
2036 * host that is consolidating the information for display.
2038 static class HostData implements Serializable {
2039 // the UUID of the host sending the data
2040 private UUID hostUuid;
2042 // all of the information derived from the 'LocalLocks' data
2043 private List<ClientData> clientDataList;
2045 // all of the information derived from the 'GlobalLocks' data
2046 private List<ServerData> serverDataList;
2049 * Constructor - this goes through all of the lock tables,
2050 * and populates 'clientDataList' and 'serverDataList'.
2054 hostUuid = Server.getThisServer().getUuid();
2056 // initial storage for client and server data
2057 clientDataList = new ArrayList<ClientData>();
2058 serverDataList = new ArrayList<ServerData>();
2060 // go through buckets
2061 for (int i = 0 ; i < Bucket.BUCKETCOUNT ; i += 1) {
2062 Bucket bucket = Bucket.getBucket(i);
2065 LocalLocks localLocks =
2066 bucket.getAdjunctDontCreate(LocalLocks.class);
2067 if (localLocks != null) {
2068 // we have client data for this bucket
2069 ClientData clientData = new ClientData(i);
2070 clientDataList.add(clientData);
2072 synchronized (localLocks) {
2073 for (WeakReference<TargetLock> wr :
2074 localLocks.weakReferenceToIdentity.keySet()) {
2075 // Note: 'targetLock' may be 'null' if it has
2076 // been abandoned, and garbage collected
2077 TargetLock targetLock = wr.get();
2079 // fetch associated 'identity'
2081 localLocks.weakReferenceToIdentity.get(wr);
2082 if (identity != null) {
2083 // add a new 'ClientDataRecord' for this bucket
2084 clientData.clientDataRecords.add(
2085 new ClientDataRecord(identity,
2086 (targetLock == null ? null :
2087 targetLock.getState())));
2094 GlobalLocks globalLocks =
2095 bucket.getAdjunctDontCreate(GlobalLocks.class);
2096 if (globalLocks != null) {
2097 // server data is already in serializable form
2098 serverDataList.add(new ServerData(i, globalLocks));
2105 * Information derived from the 'LocalLocks' adjunct to a single bucket.
2107 static class ClientData implements Serializable {
2108 // number of the bucket
2109 private int bucketNumber;
2111 // all of the client locks within this bucket
2112 private List<ClientDataRecord> clientDataRecords;
2115 * Constructor - initially, there are no 'clientDataRecords'.
2117 * @param bucketNumber the bucket these records are associated with
2119 ClientData(int bucketNumber) {
2120 this.bucketNumber = bucketNumber;
2121 clientDataRecords = new ArrayList<>();
2126 * This corresponds to the information contained within a
2127 * single 'TargetLock'.
2129 static class ClientDataRecord implements Serializable {
2130 // contains key, ownerKey, uuid
2131 private Identity identity;
2133 // state field of 'TargetLock'
2134 // (may be 'null' if there is no 'TargetLock')
2135 private State state;
2138 * Constructor - initialize the fields.
2140 * @param identity contains key, ownerKey, uuid
2141 * @param state the state if the 'TargetLock' exists, and 'null' if it
2142 * has been garbage collected
2144 ClientDataRecord(Identity identity, State state) {
2145 this.identity = identity;
2151 * Information derived from the 'GlobalLocks' adjunct to a single bucket.
2153 static class ServerData implements Serializable {
2154 // number of the bucket
2155 private int bucketNumber;
2157 // server-side data associated with a single bucket
2158 private GlobalLocks globalLocks;
2161 * Constructor - initialize the fields.
2163 * @param bucketNumber the bucket these records are associated with
2164 * @param globalLocks GlobalLocks instance associated with 'bucketNumber'
2166 ServerData(int bucketNumber, GlobalLocks globalLocks) {
2167 this.bucketNumber = bucketNumber;
2168 this.globalLocks = globalLocks;
2173 /* ============================================================ */
2176 * Instances of 'AuditData' are passed between servers as part of the
2177 * 'TargetLock' audit.
2179 static class AuditData implements Serializable {
2181 private UUID hostUuid;
2183 // client records that currently exist, or records to be cleared
2184 // (depending upon message) -- client/server is from the senders side
2185 private List<Identity> clientData;
2187 // server records that currently exist, or records to be cleared
2188 // (depending upon message) -- client/server is from the senders side
2189 private List<Identity> serverData;
2192 * Constructor - set 'hostUuid' to the current host, and start with
2196 hostUuid = Server.getThisServer().getUuid();
2197 clientData = new ArrayList<>();
2198 serverData = new ArrayList<>();
2202 * This is called when we receive an incoming 'AuditData' object from
2205 * @param includeWarnings if 'true', generate warning messages
2207 * @return an 'AuditData' instance that only contains records we
2210 AuditData generateResponse(boolean includeWarnings) {
2211 AuditData response = new AuditData();
2213 // compare remote servers client data with our server data
2214 generateResponse_clientEnd(response, includeWarnings);
2217 generateResponse_serverEnd(response, includeWarnings);
2222 private void generateResponse_clientEnd(AuditData response, boolean includeWarnings) {
2223 for (Identity identity : clientData) {
2224 // remote end is the client, and we are the server
2225 Bucket bucket = Bucket.getBucket(identity.key);
2226 GlobalLocks globalLocks =
2227 bucket.getAdjunctDontCreate(GlobalLocks.class);
2229 if (globalLocks != null) {
2230 Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry;
2231 synchronized (keyToEntry) {
2232 LockEntry le = keyToEntry.get(identity.key);
2234 if (identity.uuid.equals(le.currentOwnerUuid)
2235 && identity.ownerKey.equals(le.currentOwnerKey)) {
2240 // check the waiting list
2241 boolean match = false;
2242 for (Waiting waiting : le.waitingList) {
2243 if (identity.uuid.equals(waiting.ownerUuid)
2244 && identity.ownerKey.equals(waiting.ownerKey)) {
2245 // we found a match on the waiting list
2251 // there was a match on the waiting list
2258 // If we reach this point, a match was not confirmed. Note that it
2259 // is possible that we got caught in a transient state, so we need
2260 // to somehow make sure that we don't "correct" a problem that
2263 if (includeWarnings) {
2264 logger.warn("TargetLock audit issue: server match not found "
2265 + "(key={},ownerKey={},uuid={})",
2266 identity.key, identity.ownerKey, identity.uuid);
2269 // it was 'clientData' to the sender, but 'serverData' to us
2270 response.serverData.add(identity);
2274 private void generateResponse_serverEnd(AuditData response, boolean includeWarnings) {
2275 for (Identity identity : serverData) {
2276 // remote end is the server, and we are the client
2277 Bucket bucket = Bucket.getBucket(identity.ownerKey);
2278 LocalLocks localLocks =
2279 bucket.getAdjunctDontCreate(LocalLocks.class);
2280 if (localLocks != null) {
2281 synchronized (localLocks) {
2282 WeakReference<TargetLock> wr =
2283 localLocks.uuidToWeakReference.get(identity.uuid);
2285 Identity identity2 =
2286 localLocks.weakReferenceToIdentity.get(wr);
2287 if (identity2 != null
2288 && identity.key.equals(identity2.key)
2289 && identity.ownerKey.equals(identity2.ownerKey)) {
2297 // If we reach this point, a match was not confirmed. Note that it
2298 // is possible that we got caught in a transient state, so we need
2299 // to somehow make sure that we don't "correct" a problem that
2301 if (includeWarnings) {
2302 logger.warn("TargetLock audit issue: client match not found "
2303 + "(key={},ownerKey={},uuid={})",
2304 identity.key, identity.ownerKey, identity.uuid);
2306 response.clientData.add(identity);
2311 * The response messages contain 'Identity' objects that match those
2312 * in our outgoing '/lock/audit' message, but that the remote end could
2313 * not confirm. Again, the definition of 'client' and 'server' are
2314 * the remote ends' version.
2316 * @param server the server we sent the request to
2318 void processResponse(Server server) {
2319 if (clientData.isEmpty() && serverData.isEmpty()) {
2321 logger.info("TargetLock audit with {} completed -- no mismatches",
2326 // There are still mismatches -- note that 'clientData' and
2327 // 'serverData' are from the remote end's perspective, which is the
2328 // opposite of this end
2330 for (Identity identity : clientData) {
2331 // these are on our server end -- we were showing a lock on this
2332 // end, but the other end has no such client
2333 logger.error("Audit mismatch (GlobalLocks): (key={},owner={},uuid={})",
2334 identity.key, identity.ownerKey, identity.uuid);
2337 GlobalLocks.get(identity.key).unlock(identity.key, identity.uuid);
2339 for (Identity identity : serverData) {
2340 // these are on our client end
2341 logger.error("Audit mismatch (LocalLocks): (key={},owner={},uuid={})",
2342 identity.key, identity.ownerKey, identity.uuid);
2344 // clean up 'LocalLocks' tables
2345 LocalLocks localLocks = LocalLocks.get(identity.ownerKey);
2346 TargetLock targetLock = null;
2347 synchronized (localLocks) {
2348 WeakReference<TargetLock> wr =
2349 localLocks.uuidToWeakReference.get(identity.uuid);
2351 targetLock = wr.get();
2352 localLocks.weakReferenceToIdentity.remove(wr);
2353 localLocks.uuidToWeakReference
2354 .remove(identity.uuid);
2359 if (targetLock != null) {
2360 // may need to update state
2361 synchronized (targetLock) {
2362 if (targetLock.state != State.FREE) {
2363 targetLock.state = State.LOST;
2368 logger.info("TargetLock audit with {} completed -- {} mismatches",
2369 server, clientData.size() + serverData.size());
2373 * Serialize and base64-encode this 'AuditData' instance, so it can
2374 * be sent in a message.
2376 * @return a byte array, which can be decoded and deserialized at
2377 * the other end ('null' is returned if there were any problems)
2381 return Base64.getEncoder().encode(Util.serialize(this));
2382 } catch (IOException e) {
2383 logger.error("TargetLock.AuditData.encode Exception", e);
2389 * Base64-decode and deserialize a byte array.
2391 * @param encodedData a byte array encoded via 'AuditData.encode'
2392 * (typically on the remote end of a connection)
2393 * @return an 'AuditData' instance if decoding was successful,
2396 static AuditData decode(byte[] encodedData) {
2398 Object decodedData =
2399 Util.deserialize(Base64.getDecoder().decode(encodedData));
2400 if (decodedData instanceof AuditData) {
2401 return (AuditData)decodedData;
2404 "TargetLock.AuditData.decode returned instance of class {}",
2405 decodedData.getClass().getName());
2408 } catch (IOException | ClassNotFoundException e) {
2409 logger.error("TargetLock.AuditData.decode Exception", e);
2416 * This class contains methods that control the audit. Also, sn instance of
2417 * 'Audit' is created for each audit that is in progress.
2419 static class Audit {
2420 // if non-null, it means that we have a timer set that periodicall
2421 // triggers the audit
2422 static TimerTask timerTask = null;
2424 // maps 'Server' to audit data associated with that server
2425 Map<Server,AuditData> auditMap = new IdentityHashMap<>();
2428 * Run a single audit cycle.
2430 static void runAudit() {
2431 logger.info("Starting TargetLock audit");
2432 Audit audit = new Audit();
2434 // populate 'auditMap' table
2437 // send to all of the servers in 'auditMap' (may include this server)
2442 * Schedule the audit to run periodically based upon defined properties.
2444 static void scheduleAudit() {
2445 scheduleAudit(auditPeriod, auditGracePeriod);
2449 * Schedule the audit to run periodically -- all of the hosts arrange to
2450 * run their audit at a different time, evenly spaced across the audit
2453 * @param period how frequently to run the audit, in milliseconds
2454 * @param gracePeriod ensure that the audit doesn't run until at least
2455 * 'gracePeriod' milliseconds have passed from the current time
2457 static synchronized void scheduleAudit(final long period, final long gracePeriod) {
2459 if (timerTask != null) {
2460 // cancel current timer
2465 // this needs to run in the 'MainLoop' thread, because it is dependent
2466 // upon the list of servers, and our position in this list
2467 MainLoop.queueWork(() -> {
2468 // this runs in the 'MainLoop' thread
2470 // current list of servers
2471 Collection<Server> servers = Server.getServers();
2473 // count of the number of servers
2474 int count = servers.size();
2476 // will contain our position in this list
2480 Server thisServer = Server.getThisServer();
2482 for (Server server : servers) {
2483 if (server == thisServer) {
2489 // if index == count, we didn't find this server
2490 // (which shouldn't happen)
2492 if (index < count) {
2493 // The servers are ordered by UUID, and 'index' is this
2494 // server's position within the list. Suppose the period is
2495 // 60000 (60 seconds), and there are 5 servers -- the first one
2496 // will run the audit at 0 seconds after the minute, the next
2497 // at 12 seconds after the minute, then 24, 36, 48.
2498 long offset = (period * index) / count;
2500 // the earliest time we want the audit to run
2501 long time = System.currentTimeMillis() + gracePeriod;
2502 long startTime = time - (time % period) + offset;
2503 if (startTime <= time) {
2504 startTime += period;
2506 synchronized (Audit.class) {
2507 if (timerTask != null) {
2510 timerTask = new TimerTask() {
2517 // now, schedule the timer
2518 Util.timer.scheduleAtFixedRate(
2519 timerTask, new Date(startTime), period);
2526 * Handle an incoming '/lock/audit' message.
2528 * @param serverUuid the UUID of the intended destination server
2529 * @param ttl similar to IP time-to-live -- it controls the number of hops
2530 * @param data base64-encoded data, containing a serialized 'AuditData'
2532 * @return a serialized and base64-encoded 'AuditData' response
2534 static byte[] incomingAudit(UUID serverUuid, int ttl, byte[] encodedData) {
2535 if (!Server.getThisServer().getUuid().equals(serverUuid)) {
2538 Server server = Server.getServer(serverUuid);
2539 if (server != null) {
2540 WebTarget webTarget = server.getWebTarget("lock/audit");
2541 if (webTarget != null) {
2542 logger.info("Forwarding 'lock/audit' to uuid {}",
2544 Entity<String> entity =
2545 Entity.entity(new String(encodedData),
2546 MediaType.APPLICATION_OCTET_STREAM_TYPE);
2548 .queryParam(QP_SERVER, serverUuid.toString())
2549 .queryParam(QP_TTL, String.valueOf(ttl))
2550 .request().post(entity, byte[].class);
2555 // if we reach this point, we didn't forward for some reason
2557 logger.error("Couldn't forward 'lock/audit to uuid {}", serverUuid);
2561 AuditData auditData = AuditData.decode(encodedData);
2562 if (auditData != null) {
2563 AuditData auditResp = auditData.generateResponse(true);
2564 return auditResp.encode();
2570 * This method populates the 'auditMap' table by going through all of
2571 * the client and server lock data, and sorting it according to the
2575 for (int i = 0 ; i < Bucket.BUCKETCOUNT ; i += 1) {
2576 Bucket bucket = Bucket.getBucket(i);
2579 build_clientData(bucket);
2582 build_serverData(bucket);
2586 private void build_clientData(Bucket bucket) {
2588 LocalLocks localLocks =
2589 bucket.getAdjunctDontCreate(LocalLocks.class);
2590 if (localLocks != null) {
2591 synchronized (localLocks) {
2592 // we have client data for this bucket
2593 for (Identity identity :
2594 localLocks.weakReferenceToIdentity.values()) {
2595 // find or create the 'AuditData' instance associated
2596 // with the server owning the 'key'
2597 AuditData auditData = getAuditData(identity.key);
2598 if (auditData != null) {
2599 auditData.clientData.add(identity);
2606 private void build_serverData(Bucket bucket) {
2608 GlobalLocks globalLocks =
2609 bucket.getAdjunctDontCreate(GlobalLocks.class);
2610 if (globalLocks != null) {
2611 // we have server data for this bucket
2612 Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry;
2613 synchronized (keyToEntry) {
2614 for (LockEntry le : keyToEntry.values()) {
2615 // find or create the 'AuditData' instance associated
2616 // with the current 'ownerKey'
2617 AuditData auditData = getAuditData(le.currentOwnerKey);
2618 if (auditData != null) {
2619 // create an 'Identity' entry, and add it to
2620 // the list associated with the remote server
2621 auditData.serverData.add(
2622 new Identity(le.key, le.currentOwnerKey,
2623 le.currentOwnerUuid));
2626 for (Waiting waiting : le.waitingList) {
2627 // find or create the 'AuditData' instance associated
2628 // with the waiting entry 'ownerKey'
2629 auditData = getAuditData(waiting.ownerKey);
2630 if (auditData != null) {
2631 // create an 'Identity' entry, and add it to
2632 // the list associated with the remote server
2633 auditData.serverData.add(
2634 new Identity(le.key, waiting.ownerKey,
2635 waiting.ownerUuid));
2644 * Find or create the 'AuditData' structure associated with a particular
2647 AuditData getAuditData(String key) {
2648 // map 'key -> bucket number', and then 'bucket number' -> 'server'
2649 Server server = Bucket.bucketToServer(Bucket.bucketNumber(key));
2650 if (server != null) {
2651 AuditData auditData =
2652 auditMap.computeIfAbsent(server, sk -> new AuditData());
2656 // this happens when the bucket has not been assigned to a server yet
2661 * Using the collected 'auditMap', send out the messages to all of the
2665 if (auditMap.isEmpty()) {
2666 logger.info("TargetLock audit: no locks on this server");
2668 logger.info("TargetLock audit: sending audit information to {}",
2672 for (final Server server : auditMap.keySet()) {
2673 send_server(server);
2677 private void send_server(final Server server) {
2679 AuditData auditData = auditMap.get(server);
2681 if (server == Server.getThisServer()) {
2682 // process this locally
2683 final AuditData respData = auditData.generateResponse(true);
2684 if (respData.clientData.isEmpty()
2685 && respData.serverData.isEmpty()) {
2687 logger.info("TargetLock.Audit.send: "
2688 + "no errors from self ({})", server);
2692 // do the rest in a separate thread
2693 server.getThreadPool().execute(() -> {
2694 // wait a few seconds, and see if we still know of these
2696 if (AuditPostResponse.responseSupport(
2697 respData, "self (" + server + ")",
2698 "TargetLock.Audit.send")) {
2699 // a return falue of 'true' either indicates the
2700 // mismatches were resolved after a retry, or we
2701 // received an interrupt, and need to abort
2705 // any mismatches left in 'respData' are still issues
2706 respData.processResponse(server);
2712 byte[] encodedData = auditData.encode();
2713 if (encodedData == null) {
2714 // error has already been displayed
2719 Entity<String> entity =
2720 Entity.entity(new String(encodedData),
2721 MediaType.APPLICATION_OCTET_STREAM_TYPE);
2723 server.post("lock/audit", entity, new AuditPostResponse(server));
2727 static class AuditPostResponse implements Server.PostResponse {
2728 private Server server;
2730 AuditPostResponse(Server server) {
2731 this.server = server;
2735 public WebTarget webTarget(WebTarget webTarget) {
2736 // include the 'uuid' keyword
2738 .queryParam(QP_SERVER, server.getUuid().toString())
2739 .queryParam(QP_TTL, timeToLive);
2743 public void response(Response response) {
2744 // process the response here
2745 AuditData respData =
2746 AuditData.decode(response.readEntity(byte[].class));
2747 if (respData == null) {
2748 logger.error("TargetLock.Audit.send: "
2749 + "couldn't process response from {}",
2754 // if we reach this point, we got a response
2755 if (respData.clientData.isEmpty()
2756 && respData.serverData.isEmpty()) {
2758 logger.info("TargetLock.Audit.send: "
2759 + "no errors from {}", server);
2763 // wait a few seconds, and see if we still know of these
2765 if (responseSupport(respData, server, "AuditPostResponse.response")) {
2766 // a return falue of 'true' either indicates the mismatches
2767 // were resolved after a retry, or we received an interrupt,
2768 // and need to abort
2772 // any mismatches left in 'respData' are still there --
2773 // hopefully, they are transient issues on the other side
2774 AuditData auditData = new AuditData();
2775 auditData.clientData = respData.serverData;
2776 auditData.serverData = respData.clientData;
2779 byte[] encodedData = auditData.encode();
2780 if (encodedData == null) {
2781 // error has already been displayed
2786 Entity<String> entity =
2787 Entity.entity(new String(encodedData),
2788 MediaType.APPLICATION_OCTET_STREAM_TYPE);
2790 // send new list to other end
2792 .getWebTarget("lock/audit")
2793 .queryParam(QP_SERVER, server.getUuid().toString())
2794 .queryParam(QP_TTL, timeToLive)
2795 .request().post(entity);
2797 respData = AuditData.decode(response.readEntity(byte[].class));
2798 if (respData == null) {
2799 logger.error("TargetLock.auditDataBuilder.send: "
2800 + "couldn't process response from {}",
2805 // if there are mismatches left, they are presumably real
2806 respData.processResponse(server);
2809 // Handle mismatches indicated by an audit response -- a return value of
2810 // 'true' indicates that there were no mismatches after a retry, or
2811 // we received an interrupt. In either case, the caller returns.
2812 private static boolean responseSupport(AuditData respData, Object serverString, String caller) {
2813 logger.info("{}: mismatches from {}", caller, serverString);
2815 Thread.sleep(auditRetryDelay);
2816 } catch (InterruptedException e) {
2817 logger.error("{}: Interrupted handling audit response from {}",
2818 caller, serverString);
2820 Thread.currentThread().interrupt();
2824 // This will check against our own data -- any mismatches
2825 // mean that things have changed since we sent out the
2826 // first message. We will remove any mismatches from
2827 // 'respData', and see if there are any left.
2828 AuditData mismatches = respData.generateResponse(false);
2830 respData.serverData.removeAll(mismatches.clientData);
2831 respData.clientData.removeAll(mismatches.serverData);
2833 if (respData.clientData.isEmpty()
2834 && respData.serverData.isEmpty()) {
2836 // there must have been transient issues on our side
2837 logger.info("{}: no mismatches from {} after retry",
2838 caller, serverString);