2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.serverpool.persistence;
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.ObjectOutputStream;
26 import java.util.Base64;
27 import java.util.Collection;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.IdentityHashMap;
31 import java.util.LinkedList;
32 import java.util.List;
34 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
40 import javax.ws.rs.Consumes;
41 import javax.ws.rs.POST;
42 import javax.ws.rs.Path;
43 import javax.ws.rs.QueryParam;
44 import javax.ws.rs.client.Entity;
45 import javax.ws.rs.client.WebTarget;
46 import javax.ws.rs.core.MediaType;
48 import org.kie.api.event.rule.ObjectDeletedEvent;
49 import org.kie.api.event.rule.ObjectInsertedEvent;
50 import org.kie.api.event.rule.ObjectUpdatedEvent;
51 import org.kie.api.event.rule.RuleRuntimeEventListener;
52 import org.kie.api.runtime.KieSession;
53 import org.onap.policy.drools.core.DroolsRunnable;
54 import org.onap.policy.drools.core.PolicyContainer;
55 import org.onap.policy.drools.core.PolicySession;
56 import org.onap.policy.drools.core.PolicySessionFeatureApi;
57 import org.onap.policy.drools.serverpool.Bucket;
58 import org.onap.policy.drools.serverpool.Keyword;
59 import org.onap.policy.drools.serverpool.Server;
60 import org.onap.policy.drools.serverpool.ServerPoolApi;
61 import org.onap.policy.drools.serverpool.TargetLock.GlobalLocks;
62 import org.onap.policy.drools.serverpool.Util;
63 import org.onap.policy.drools.system.PolicyControllerConstants;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
68 * This class provides a persistence implementation for 'feature-server-pool',
69 * backing up the data of selected Drools sessions and server-side 'TargetLock'
70 * data on separate hosts.
72 public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
73 private static Logger logger = LoggerFactory.getLogger(Persistence.class);
75 // HTTP query parameters
76 private static final String QP_BUCKET = "bucket";
77 private static final String QP_SESSION = "session";
78 private static final String QP_COUNT = "count";
79 private static final String QP_DEST = "dest";
81 /***************************************/
82 /* 'PolicySessionFeatureApi' interface */
83 /***************************************/
89 public int getSequenceNumber() {
97 public void newPolicySession(PolicySession policySession) {
98 // a new Drools session is being created -- look at the properties
99 // 'persistence.<session-name>.type' and 'persistence.type' to determine
100 // whether persistence is enabled for this session
102 // fetch properties file
103 PolicyContainer container = policySession.getPolicyContainer();
104 Properties properties = PolicyControllerConstants.getFactory().get(
105 container.getGroupId(), container.getArtifactId()).getProperties();
107 // look at 'persistence.<session-name>.type', and 'persistence.type'
108 String type = properties.getProperty("persistence." + policySession.getName() + ".type");
110 type = properties.getProperty("persistence.type");
113 if ("auto".equals(type) || "native".equals(type)) {
114 // persistence is enabled this session
115 policySession.setAdjunct(PersistenceRunnable.class,
116 new PersistenceRunnable(policySession));
120 /*****************************/
121 /* 'ServerPoolApi' interface */
122 /*****************************/
128 public Collection<Class<?>> servletClasses() {
129 // the nested class 'Rest' contains additional REST calls
130 List<Class<?>> classes = new LinkedList<>();
131 classes.add(Rest.class);
139 public void restoreBucket(Bucket bucket) {
140 // if we reach this point, no data was received from the old server, which
141 // means we just initialized, or we did not have a clean bucket migration
143 ReceiverBucketData rbd = bucket.removeAdjunct(ReceiverBucketData.class);
145 // there is backup data -- do a restore
146 rbd.restoreBucket(bucket);
154 public void lockUpdate(Bucket bucket, GlobalLocks globalLocks) {
155 // we received a notification from 'TargetLock' that 'GlobalLocks' data
156 // has changed (TBD: should any attempt be made to group updates that
157 // occur in close succession?)
159 sendLockDataToBackups(bucket, globalLocks);
166 public void auditBucket(Bucket bucket, boolean isOwner, boolean isBackup) {
168 // it may be that backup hosts have changed --
169 // send out lock and session data
171 // starting with lock data
172 GlobalLocks globalLocks =
173 bucket.getAdjunctDontCreate(GlobalLocks.class);
174 if (globalLocks != null) {
175 sendLockDataToBackups(bucket, globalLocks);
179 SenderBucketData sbd =
180 bucket.getAdjunctDontCreate(SenderBucketData.class);
183 // go through all of the sessions where we have persistent data
184 for (PolicySession session : sbd.sessionData.keySet()) {
185 Object obj = session.getAdjunct(PersistenceRunnable.class);
186 if (obj instanceof PersistenceRunnable) {
187 PersistenceRunnable pr = (PersistenceRunnable)obj;
188 synchronized (pr.modifiedBuckets) {
189 // mark bucket associated with this session
191 pr.modifiedBuckets.add(bucket);
197 } else if (bucket.removeAdjunct(SenderBucketData.class) != null) {
198 logger.warn("Bucket {}: Removed superfluous "
199 + "'SenderBucketData' adjunct",
202 if (!isBackup && bucket.removeAdjunct(ReceiverBucketData.class) != null) {
203 logger.warn("Bucket {}: Removed superfluous "
204 + "'ReceiverBucketData' adjunct",
210 * This method supports 'lockUpdate' -- it has been moved to a separate
211 * 'static' method, so it can also be called after restoring 'GlobalLocks',
212 * so it can be backed up on its new servers.
214 * @param bucket the bucket containing the 'GlobalLocks' adjunct
215 * @param globalLocks the 'GlobalLocks' adjunct
217 private static void sendLockDataToBackups(final Bucket bucket, final GlobalLocks globalLocks) {
218 final int bucketNumber = bucket.getIndex();
219 SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class);
222 // serialize the 'globalLocks' instance
223 ByteArrayOutputStream bos = new ByteArrayOutputStream();
225 ObjectOutputStream oos = new ObjectOutputStream(bos);
226 synchronized (globalLocks) {
227 // the 'GlobalLocks' instance and counter are tied together
228 oos.writeObject(globalLocks);
229 lockCount = sbd.getLockCountAndIncrement();
232 } catch (IOException e) {
233 logger.error("Persistence.LockUpdate({})", bucketNumber, e);
237 // convert to Base64, and generate an 'Entity' for the REST call
238 byte[] serializedData = Base64.getEncoder().encode(bos.toByteArray());
239 final Entity<String> entity =
240 Entity.entity(new String(serializedData),
241 MediaType.APPLICATION_OCTET_STREAM_TYPE);
242 final int count = lockCount;
244 // build list of backup servers
245 Set<Server> servers = new HashSet<>();
246 synchronized (bucket) {
247 servers.add(bucket.getPrimaryBackup());
248 servers.add(bucket.getSecondaryBackup());
250 for (final Server server : servers) {
251 if (server != null) {
252 // send out REST command
253 server.getThreadPool().execute(() -> {
254 WebTarget webTarget =
255 server.getWebTarget("persistence/lock");
256 if (webTarget != null) {
258 .queryParam(QP_BUCKET, bucketNumber)
259 .queryParam(QP_COUNT, count)
260 .queryParam(QP_DEST, server.getUuid())
261 .request().post(entity);
268 /* ============================================================ */
271 * One instance of this class exists for every Drools session that is
272 * being backed up. It implements the 'RuleRuntimeEventListener' interface,
273 * so it receives notifications of Drools object changes, and also implements
274 * the 'DroolsRunnable' interface, so it can run within the Drools session
275 * thread, which should reduce the chance of catching an object in a
278 static class PersistenceRunnable implements DroolsRunnable,
279 RuleRuntimeEventListener {
280 // this is the Drools session associated with this instance
281 private PolicySession session;
283 // this is the string "<groupId>:<artifactId>:<sessionName>"
284 private String encodedSessionName;
286 // the buckets in this session which have modifications that still
287 // need to be backed up
288 private Set<Bucket> modifiedBuckets = new HashSet<>();
291 * Constructor - save the session information, and start listing for
294 PersistenceRunnable(PolicySession session) {
295 PolicyContainer pc = session.getPolicyContainer();
297 this.session = session;
298 this.encodedSessionName =
299 pc.getGroupId() + ":" + pc.getArtifactId() + ":" + session.getName();
300 session.getKieSession().addEventListener(this);
303 /******************************/
304 /* 'DroolsRunnable' interface */
305 /******************************/
313 // save a snapshot of 'modifiedBuckets'
314 Set<Bucket> saveModifiedBuckets;
315 synchronized (modifiedBuckets) {
316 saveModifiedBuckets = new HashSet<>(modifiedBuckets);
317 modifiedBuckets.clear();
320 // iterate over all of the modified buckets, sending update data
321 // to all of the backup servers
322 for (Bucket bucket : saveModifiedBuckets) {
323 SenderBucketData sbd =
324 bucket.getAdjunctDontCreate(SenderBucketData.class);
326 // serialization occurs within the Drools session thread
327 SenderSessionBucketData ssbd = sbd.getSessionData(session);
328 byte[] serializedData =
329 ssbd.getLatestEncodedSerializedData();
330 final int count = ssbd.getCount();
331 final Entity<String> entity =
332 Entity.entity(new String(serializedData),
333 MediaType.APPLICATION_OCTET_STREAM_TYPE);
335 // build list of backup servers
336 Set<Server> servers = new HashSet<>();
337 synchronized (bucket) {
338 servers.add(bucket.getPrimaryBackup());
339 servers.add(bucket.getSecondaryBackup());
341 for (final Server server : servers) {
342 if (server != null) {
343 // send out REST command
344 server.getThreadPool().execute(() -> {
345 WebTarget webTarget =
346 server.getWebTarget("persistence/session");
347 if (webTarget != null) {
349 .queryParam(QP_BUCKET,
351 .queryParam(QP_SESSION,
353 .queryParam(QP_COUNT, count)
354 .queryParam(QP_DEST, server.getUuid())
355 .request().post(entity);
362 } catch (Exception e) {
363 logger.error("Persistence.PersistenceRunnable.run:", e);
367 /****************************************/
368 /* 'RuleRuntimeEventListener' interface */
369 /****************************************/
375 public void objectDeleted(ObjectDeletedEvent event) {
376 // determine Drools object that was deleted
377 Object object = event.getOldObject();
379 // determine keyword, if any
380 String keyword = Keyword.lookupKeyword(object);
381 if (keyword == null) {
382 // no keyword, so there is no associated bucket
386 // locate bucket and associated data
387 // (don't create adjunct if it isn't there -- there's nothing to delete)
388 Bucket bucket = Bucket.getBucket(keyword);
389 SenderBucketData sbd =
390 bucket.getAdjunctDontCreate(SenderBucketData.class);
392 // add bucket to 'modified' list
393 synchronized (modifiedBuckets) {
394 modifiedBuckets.add(bucket);
397 // update set of Drools objects in this bucket
398 sbd.getSessionData(session).objectDeleted(object);
400 // insert this 'DroolsRunnable' to do the backup (note that it
401 // may already be inserted from a previous update to this
402 // DroolsSession -- eventually, the rule will fire, and the 'run'
403 // method will be called)
404 session.getKieSession().insert(this);
412 public void objectInserted(ObjectInsertedEvent event) {
413 objectChanged(event.getObject());
420 public void objectUpdated(ObjectUpdatedEvent event) {
421 objectChanged(event.getObject());
425 * A Drools session object was either inserted or updated
426 * (both are treated the same way).
428 * @param object the object being inserted or updated
430 private void objectChanged(Object object) {
431 // determine keyword, if any
432 String keyword = Keyword.lookupKeyword(object);
433 if (keyword == null) {
434 // no keyword, so there is no associated bucket
438 // locate bucket and associated data
439 Bucket bucket = Bucket.getBucket(keyword);
440 SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class);
442 // add bucket to 'modified' list
443 synchronized (modifiedBuckets) {
444 modifiedBuckets.add(bucket);
447 // update set of Drools objects in this bucket
448 sbd.getSessionData(session).objectChanged(object);
450 // insert this 'DroolsRunnable' to do the backup (note that it
451 // may already be inserted from a previous update to this
452 // DroolsSession -- eventually, the rule will fire, and the 'run'
453 // method will be called)
454 session.getKieSession().insert(this);
458 /* ============================================================ */
461 * Per-session data for a single bucket on the sender's side.
463 static class SenderSessionBucketData {
464 // the set of all objects in the session associated with this bucket
465 Map<Object,Object> droolsObjects = new IdentityHashMap<>();
467 // used by the receiver to determine whether an update is really newer
470 // serialized base64 form of 'droolsObjects'
471 // (TBD: determine if we are getting any benefit from caching this)
472 byte[] encodedSerializedData = null;
474 // 'true' means that 'encodedSerializedData' is out-of-date
475 boolean rebuildNeeded = true;
478 * Notification that a Drools object associated with this bucket
481 * @param object the object that was deleted
483 synchronized void objectDeleted(Object object) {
484 if (droolsObjects.remove(object) != null) {
485 rebuildNeeded = true;
490 * Notification that a Drools object associated with this bucket
491 * was inserted or updated.
493 * @param object the object that was updated
495 synchronized void objectChanged(Object object) {
496 droolsObjects.put(object, object);
497 rebuildNeeded = true;
501 * Serialize and base64-encode the objects in this Drools session.
503 * @return a byte array containing the encoded serialized objects
505 synchronized byte[] getLatestEncodedSerializedData() {
508 // this should be run in the Drools session thread in order
509 // to avoid transient data
510 encodedSerializedData =
511 Base64.getEncoder().encode(Util.serialize(droolsObjects));
513 } catch (IOException e) {
514 logger.error("Persistence.SenderSessionBucketData."
515 + "getLatestEncodedSerializedData: ", e);
517 rebuildNeeded = false;
519 return encodedSerializedData;
523 * Return a counter that will be used for update comparison.
525 * @return the value of a counter that can be used to determine whether
526 * an update is really newer than the previous update
528 synchronized int getCount() {
533 /* ============================================================ */
536 * Data for a single bucket on the sender's side.
538 public static class SenderBucketData {
539 // maps session name into SenderSessionBucketData
540 Map<PolicySession, SenderSessionBucketData> sessionData =
541 new IdentityHashMap<>();
543 // used by the receiver to determine whether an update is really newer
547 * Create or fetch the 'SenderSessionBucketData' instance associated
548 * with the specified session.
550 * @param session the 'PolicySession' object
551 * @return the associated 'SenderSessionBucketData' instance
553 synchronized SenderSessionBucketData getSessionData(PolicySession session) {
554 return sessionData.computeIfAbsent(session, key -> new SenderSessionBucketData());
558 * Return a counter that will be used for update comparison.
560 * @return the value of a counter that can be used to determine whether
561 * an update is really newer than the previous update
563 int getLockCountAndIncrement() {
564 // note that this is synchronized using the 'GlobalLocks' instance
565 // within the same bucket
570 /* ============================================================ */
573 * Data for a single bucket and session on the receiver's side.
575 static class ReceiverSessionBucketData {
576 // used to determine whether an update is really newer
579 // serialized base64 form of 'droolsObjects'
580 byte[] encodedSerializedData = null;
583 /* ============================================================ */
586 * Data for a single bucket on the receiver's side -- this adjunct is used
587 * to store encoded data on a backup host. It will only be needed if the
588 * bucket owner fails.
590 public static class ReceiverBucketData {
591 static final String RESTORE_BUCKET_ERROR =
592 "Persistence.ReceiverBucketData.restoreBucket: ";
594 // maps session name into encoded data
595 Map<String, ReceiverSessionBucketData> sessionData = new HashMap<>();
597 // used by the receiver to determine whether an update is really newer
601 byte[] lockData = null;
604 * This method is called in response to the '/persistence/session'
605 * REST message. It stores the base64-encoded and serialized data
606 * for a particular bucket and session.
608 * @param bucketNumber identifies the bucket
609 * @param sessionName identifies the Drools session
610 * @param count counter used to determine whether data is really newer
611 * @param data base64-encoded serialized data for this bucket and session
613 static void receiveSession(int bucketNumber, String sessionName, int count, byte[] data) {
615 Bucket bucket = Bucket.getBucket(bucketNumber);
617 // create/fetch the 'ReceiverBucketData' adjunct
618 ReceiverBucketData rbd = bucket.getAdjunct(ReceiverBucketData.class);
620 // update the session data
621 ReceiverSessionBucketData rsbd = rbd.sessionData.get(sessionName);
623 rsbd = new ReceiverSessionBucketData();
624 rbd.sessionData.put(sessionName, rsbd);
627 if ((count - rsbd.count) > 0 || count == 0) {
630 rsbd.encodedSerializedData = data;
636 * This method is called in response to the '/persistence/lock'
637 * REST message. It stores the base64-encoded and serialized
638 * server-side lock data associated with this bucket.
640 * @param bucketNumber identifies the bucket
641 * @param count counter used to determine whether data is really newer
642 * @param data base64-encoded serialized lock data for this bucket
644 static void receiveLockData(int bucketNumber, int count, byte[] data) {
646 Bucket bucket = Bucket.getBucket(bucketNumber);
648 // create/fetch the 'ReceiverBucketData' adjunct
649 ReceiverBucketData rbd = bucket.getAdjunct(ReceiverBucketData.class);
651 // update the lock data
652 if ((count - rbd.lockCount) > 0 || count == 0) {
653 rbd.lockCount = count;
660 * This method is called when a bucket is being restored from persistent
661 * data, which indicates that a clean migration didn't occur.
662 * Drools session and/or lock data is restored.
664 * @param bucket the bucket being restored
666 synchronized void restoreBucket(Bucket bucket) {
667 // one entry for each Drools session being restored --
668 // indicates when the restore is complete (restore runs within
669 // the Drools session thread)
670 List<CountDownLatch> sessionLatches = restoreBucket_droolsSessions();
673 restoreBucket_locks(bucket);
675 // wait for all of the sessions to update
677 for (CountDownLatch sessionLatch : sessionLatches) {
678 if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) {
679 logger.error("{}: timed out waiting for session latch",
683 } catch (InterruptedException e) {
684 logger.error("Exception in {}", this, e);
685 Thread.currentThread().interrupt();
689 private List<CountDownLatch> restoreBucket_droolsSessions() {
690 List<CountDownLatch> sessionLatches = new LinkedList<>();
691 for (Map.Entry<String, ReceiverSessionBucketData> entry : sessionData.entrySet()) {
692 String sessionName = entry.getKey();
693 ReceiverSessionBucketData rsbd = entry.getValue();
695 // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
696 String[] nameSegments = sessionName.split(":");
697 PolicySession policySession = null;
699 // locate the 'PolicyContainer' and 'PolicySession'
700 if (nameSegments.length == 3) {
701 // step through all 'PolicyContainer' instances looking
702 // for a matching 'artifactId' & 'groupId'
703 for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
704 if (nameSegments[1].equals(pc.getArtifactId())
705 && nameSegments[0].equals(pc.getGroupId())) {
706 // 'PolicyContainer' matches -- try to fetch the session
707 policySession = pc.getPolicySession(nameSegments[2]);
713 if (policySession == null) {
714 logger.error(RESTORE_BUCKET_ERROR
715 + "Can't find PolicySession{}", sessionName);
721 // deserialization needs to use the correct 'ClassLoader'
722 obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
723 policySession.getPolicyContainer().getClassLoader());
724 } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
725 logger.error(RESTORE_BUCKET_ERROR
726 + "Failed to read data for session '{}'",
729 // can't decode -- skip this session
733 if (!(obj instanceof Map)) {
734 logger.error(RESTORE_BUCKET_ERROR
735 + "Session '{}' data has class {}, expected 'Map'",
736 sessionName, obj.getClass().getName());
738 // wrong object type decoded -- skip this session
742 // if we reach this point, we have decoded the persistent data
744 final Map<?,?> droolsObjects = (Map<?,?>) obj;
746 // signal when restore is complete
747 final CountDownLatch sessionLatch = new CountDownLatch(1);
749 // 'KieSession' object
750 final KieSession kieSession = policySession.getKieSession();
752 // run the following within the Drools session thread
753 DroolsRunnable insertDroolsObjects = () -> {
755 // insert all of the Drools objects into the session
756 for (Object droolsObj : droolsObjects.keySet()) {
757 kieSession.insert(droolsObj);
761 sessionLatch.countDown();
764 kieSession.insert(insertDroolsObjects);
766 // add this to the set of 'CountDownLatch's we are waiting for
767 sessionLatches.add(sessionLatch);
769 return sessionLatches;
772 private void restoreBucket_locks(Bucket bucket) {
773 if (lockData != null) {
777 obj = Util.deserialize(Base64.getDecoder().decode(lockData));
778 if (obj instanceof GlobalLocks) {
779 bucket.putAdjunct(obj);
781 // send out updated date
782 sendLockDataToBackups(bucket, (GlobalLocks)obj);
784 logger.error(RESTORE_BUCKET_ERROR
785 + "Expected 'GlobalLocks', got '{}'",
786 obj.getClass().getName());
788 } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
789 logger.error(RESTORE_BUCKET_ERROR
790 + "Failed to read lock data", e);
791 // skip the lock data
798 /* ============================================================ */
801 public static class Rest {
803 * Handle the '/persistence/session' REST call.
806 @Path("/persistence/session")
807 @Consumes(MediaType.APPLICATION_OCTET_STREAM)
808 public void receiveSession(@QueryParam(QP_BUCKET) int bucket,
809 @QueryParam(QP_SESSION) String sessionName,
810 @QueryParam(QP_COUNT) int count,
811 @QueryParam(QP_DEST) UUID dest,
813 logger.debug("/persistence/session: (bucket={},session={},count={}) "
814 + "got {} bytes of data",
815 bucket, sessionName, count, data.length);
816 if (dest == null || dest.equals(Server.getThisServer().getUuid())) {
817 ReceiverBucketData.receiveSession(bucket, sessionName, count, data);
819 // This host is not the intended destination -- this could happen
820 // if it was sent from another site. Leave off the 'dest' param
821 // when forwarding the message, to ensure that we don't have
822 // an infinite forwarding loop, if the site data happens to be bad.
826 if ((server = Server.getServer(dest)) != null
828 server.getWebTarget("persistence/session")) != null) {
829 Entity<String> entity =
830 Entity.entity(new String(data),
831 MediaType.APPLICATION_OCTET_STREAM_TYPE);
833 .queryParam(QP_BUCKET, bucket)
834 .queryParam(QP_SESSION, sessionName)
835 .queryParam(QP_COUNT, count)
836 .request().post(entity);
842 * Handle the '/persistence/lock' REST call.
845 @Path("/persistence/lock")
846 @Consumes(MediaType.APPLICATION_OCTET_STREAM)
847 public void receiveLockData(@QueryParam(QP_BUCKET) int bucket,
848 @QueryParam(QP_COUNT) int count,
849 @QueryParam(QP_DEST) UUID dest,
851 logger.debug("/persistence/lock: (bucket={},count={}) "
852 + "got {} bytes of data", bucket, count, data.length);
853 if (dest == null || dest.equals(Server.getThisServer().getUuid())) {
854 ReceiverBucketData.receiveLockData(bucket, count, data);
856 // This host is not the intended destination -- this could happen
857 // if it was sent from another site. Leave off the 'dest' param
858 // when forwarding the message, to ensure that we don't have
859 // an infinite forwarding loop, if the site data happens to be bad.
863 if ((server = Server.getServer(dest)) != null
864 && (webTarget = server.getWebTarget("persistence/lock")) != null) {
865 Entity<String> entity =
866 Entity.entity(new String(data),
867 MediaType.APPLICATION_OCTET_STREAM_TYPE);
869 .queryParam(QP_BUCKET, bucket)
870 .queryParam(QP_COUNT, count)
871 .request().post(entity);