Reduce Sonar complaints in 'feature-server-pool'
[policy/drools-pdp.git] / feature-server-pool / src / main / java / org / onap / policy / drools / serverpool / persistence / Persistence.java
1 /*
2  * ============LICENSE_START=======================================================
3  * feature-server-pool
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.serverpool.persistence;
22
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.ObjectOutputStream;
26 import java.util.Base64;
27 import java.util.Collection;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.IdentityHashMap;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.Set;
36 import java.util.UUID;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39
40 import javax.ws.rs.Consumes;
41 import javax.ws.rs.POST;
42 import javax.ws.rs.Path;
43 import javax.ws.rs.QueryParam;
44 import javax.ws.rs.client.Entity;
45 import javax.ws.rs.client.WebTarget;
46 import javax.ws.rs.core.MediaType;
47
48 import org.kie.api.event.rule.ObjectDeletedEvent;
49 import org.kie.api.event.rule.ObjectInsertedEvent;
50 import org.kie.api.event.rule.ObjectUpdatedEvent;
51 import org.kie.api.event.rule.RuleRuntimeEventListener;
52 import org.kie.api.runtime.KieSession;
53 import org.onap.policy.drools.core.DroolsRunnable;
54 import org.onap.policy.drools.core.PolicyContainer;
55 import org.onap.policy.drools.core.PolicySession;
56 import org.onap.policy.drools.core.PolicySessionFeatureApi;
57 import org.onap.policy.drools.serverpool.Bucket;
58 import org.onap.policy.drools.serverpool.Keyword;
59 import org.onap.policy.drools.serverpool.Server;
60 import org.onap.policy.drools.serverpool.ServerPoolApi;
61 import org.onap.policy.drools.serverpool.TargetLock.GlobalLocks;
62 import org.onap.policy.drools.serverpool.Util;
63 import org.onap.policy.drools.system.PolicyControllerConstants;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  * This class provides a persistence implementation for 'feature-server-pool',
69  * backing up the data of selected Drools sessions and server-side 'TargetLock'
70  * data on separate hosts.
71  */
72 public class Persistence implements PolicySessionFeatureApi, ServerPoolApi {
73     private static Logger logger = LoggerFactory.getLogger(Persistence.class);
74
75     // HTTP query parameters
76     private static final String QP_BUCKET = "bucket";
77     private static final String QP_SESSION = "session";
78     private static final String QP_COUNT = "count";
79     private static final String QP_DEST = "dest";
80
81     /***************************************/
82     /* 'PolicySessionFeatureApi' interface */
83     /***************************************/
84
85     /**
86      * {@inheritDoc}
87      */
88     @Override
89     public int getSequenceNumber() {
90         return 1;
91     }
92
93     /**
94      * {@inheritDoc}
95      */
96     @Override
97     public void newPolicySession(PolicySession policySession) {
98         // a new Drools session is being created -- look at the properties
99         // 'persistence.<session-name>.type' and 'persistence.type' to determine
100         // whether persistence is enabled for this session
101
102         // fetch properties file
103         PolicyContainer container = policySession.getPolicyContainer();
104         Properties properties = PolicyControllerConstants.getFactory().get(
105             container.getGroupId(), container.getArtifactId()).getProperties();
106
107         // look at 'persistence.<session-name>.type', and 'persistence.type'
108         String type = properties.getProperty("persistence." + policySession.getName() + ".type");
109         if (type == null) {
110             type = properties.getProperty("persistence.type");
111         }
112
113         if ("auto".equals(type) || "native".equals(type)) {
114             // persistence is enabled this session
115             policySession.setAdjunct(PersistenceRunnable.class,
116                                      new PersistenceRunnable(policySession));
117         }
118     }
119
120     /*****************************/
121     /* 'ServerPoolApi' interface */
122     /*****************************/
123
124     /**
125      * {@inheritDoc}
126      */
127     @Override
128     public Collection<Class<?>> servletClasses() {
129         // the nested class 'Rest' contains additional REST calls
130         List<Class<?>> classes = new LinkedList<>();
131         classes.add(Rest.class);
132         return classes;
133     }
134
135     /**
136      * {@inheritDoc}
137      */
138     @Override
139     public void restoreBucket(Bucket bucket) {
140         // if we reach this point, no data was received from the old server, which
141         // means we just initialized, or we did not have a clean bucket migration
142
143         ReceiverBucketData rbd = bucket.removeAdjunct(ReceiverBucketData.class);
144         if (rbd != null) {
145             // there is backup data -- do a restore
146             rbd.restoreBucket(bucket);
147         }
148     }
149
150     /**
151      * {@inheritDoc}
152      */
153     @Override
154     public void lockUpdate(Bucket bucket, GlobalLocks globalLocks) {
155         // we received a notification from 'TargetLock' that 'GlobalLocks' data
156         // has changed (TBD: should any attempt be made to group updates that
157         // occur in close succession?)
158
159         sendLockDataToBackups(bucket, globalLocks);
160     }
161
162     /**
163      * {@inheritDoc}
164      */
165     @Override
166     public void auditBucket(Bucket bucket, boolean isOwner, boolean isBackup) {
167         if (isOwner) {
168             // it may be that backup hosts have changed --
169             // send out lock and session data
170
171             // starting with lock data
172             GlobalLocks globalLocks =
173                 bucket.getAdjunctDontCreate(GlobalLocks.class);
174             if (globalLocks != null) {
175                 sendLockDataToBackups(bucket, globalLocks);
176             }
177
178             // now, session data
179             SenderBucketData sbd =
180                 bucket.getAdjunctDontCreate(SenderBucketData.class);
181             if (sbd != null) {
182                 synchronized (sbd) {
183                     // go through all of the sessions where we have persistent data
184                     for (PolicySession session : sbd.sessionData.keySet()) {
185                         Object obj = session.getAdjunct(PersistenceRunnable.class);
186                         if (obj instanceof PersistenceRunnable) {
187                             PersistenceRunnable pr = (PersistenceRunnable)obj;
188                             synchronized (pr.modifiedBuckets) {
189                                 // mark bucket associated with this session
190                                 // as modified
191                                 pr.modifiedBuckets.add(bucket);
192                             }
193                         }
194                     }
195                 }
196             }
197         } else if (bucket.removeAdjunct(SenderBucketData.class) != null) {
198             logger.warn("Bucket {}: Removed superfluous "
199                         + "'SenderBucketData' adjunct",
200                         bucket.getIndex());
201         }
202         if (!isBackup && bucket.removeAdjunct(ReceiverBucketData.class) != null) {
203             logger.warn("Bucket {}: Removed superfluous "
204                         + "'ReceiverBucketData' adjunct",
205                         bucket.getIndex());
206         }
207     }
208
209     /**
210      * This method supports 'lockUpdate' -- it has been moved to a separate
211      * 'static' method, so it can also be called after restoring 'GlobalLocks',
212      * so it can be backed up on its new servers.
213      *
214      * @param bucket the bucket containing the 'GlobalLocks' adjunct
215      * @param globalLocks the 'GlobalLocks' adjunct
216      */
217     private static void sendLockDataToBackups(final Bucket bucket, final GlobalLocks globalLocks) {
218         final int bucketNumber = bucket.getIndex();
219         SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class);
220         int lockCount = 0;
221
222         // serialize the 'globalLocks' instance
223         ByteArrayOutputStream bos = new ByteArrayOutputStream();
224         try {
225             ObjectOutputStream oos = new ObjectOutputStream(bos);
226             synchronized (globalLocks) {
227                 // the 'GlobalLocks' instance and counter are tied together
228                 oos.writeObject(globalLocks);
229                 lockCount = sbd.getLockCountAndIncrement();
230             }
231             oos.close();
232         } catch (IOException e) {
233             logger.error("Persistence.LockUpdate({})", bucketNumber, e);
234             return;
235         }
236
237         // convert to Base64, and generate an 'Entity' for the REST call
238         byte[] serializedData = Base64.getEncoder().encode(bos.toByteArray());
239         final Entity<String> entity =
240             Entity.entity(new String(serializedData),
241                           MediaType.APPLICATION_OCTET_STREAM_TYPE);
242         final int count = lockCount;
243
244         // build list of backup servers
245         Set<Server> servers = new HashSet<>();
246         synchronized (bucket) {
247             servers.add(bucket.getPrimaryBackup());
248             servers.add(bucket.getSecondaryBackup());
249         }
250         for (final Server server : servers) {
251             if (server != null) {
252                 // send out REST command
253                 server.getThreadPool().execute(() -> {
254                     WebTarget webTarget =
255                         server.getWebTarget("persistence/lock");
256                     if (webTarget != null) {
257                         webTarget
258                         .queryParam(QP_BUCKET, bucketNumber)
259                         .queryParam(QP_COUNT, count)
260                         .queryParam(QP_DEST, server.getUuid())
261                         .request().post(entity);
262                     }
263                 });
264             }
265         }
266     }
267
268     /* ============================================================ */
269
270     /**
271      * One instance of this class exists for every Drools session that is
272      * being backed up. It implements the 'RuleRuntimeEventListener' interface,
273      * so it receives notifications of Drools object changes, and also implements
274      * the 'DroolsRunnable' interface, so it can run within the Drools session
275      * thread, which should reduce the chance of catching an object in a
276      * transient state.
277      */
278     static class PersistenceRunnable implements DroolsRunnable,
279         RuleRuntimeEventListener {
280         // this is the Drools session associated with this instance
281         private PolicySession session;
282
283         // this is the string "<groupId>:<artifactId>:<sessionName>"
284         private String encodedSessionName;
285
286         // the buckets in this session which have modifications that still
287         // need to be backed up
288         private Set<Bucket> modifiedBuckets = new HashSet<>();
289
290         /**
291          * Constructor - save the session information, and start listing for
292          * updates.
293          */
294         PersistenceRunnable(PolicySession session) {
295             PolicyContainer pc = session.getPolicyContainer();
296
297             this.session = session;
298             this.encodedSessionName =
299                 pc.getGroupId() + ":" + pc.getArtifactId() + ":" + session.getName();
300             session.getKieSession().addEventListener(this);
301         }
302
303         /******************************/
304         /* 'DroolsRunnable' interface */
305         /******************************/
306
307         /**
308          * {@inheritDoc}
309          */
310         @Override
311         public void run() {
312             try {
313                 // save a snapshot of 'modifiedBuckets'
314                 Set<Bucket> saveModifiedBuckets;
315                 synchronized (modifiedBuckets) {
316                     saveModifiedBuckets = new HashSet<>(modifiedBuckets);
317                     modifiedBuckets.clear();
318                 }
319
320                 // iterate over all of the modified buckets, sending update data
321                 // to all of the backup servers
322                 for (Bucket bucket : saveModifiedBuckets) {
323                     SenderBucketData sbd =
324                         bucket.getAdjunctDontCreate(SenderBucketData.class);
325                     if (sbd != null) {
326                         // serialization occurs within the Drools session thread
327                         SenderSessionBucketData ssbd = sbd.getSessionData(session);
328                         byte[] serializedData =
329                             ssbd.getLatestEncodedSerializedData();
330                         final int count = ssbd.getCount();
331                         final Entity<String> entity =
332                             Entity.entity(new String(serializedData),
333                                           MediaType.APPLICATION_OCTET_STREAM_TYPE);
334
335                         // build list of backup servers
336                         Set<Server> servers = new HashSet<>();
337                         synchronized (bucket) {
338                             servers.add(bucket.getPrimaryBackup());
339                             servers.add(bucket.getSecondaryBackup());
340                         }
341                         for (final Server server : servers) {
342                             if (server != null) {
343                                 // send out REST command
344                                 server.getThreadPool().execute(() -> {
345                                     WebTarget webTarget =
346                                         server.getWebTarget("persistence/session");
347                                     if (webTarget != null) {
348                                         webTarget
349                                         .queryParam(QP_BUCKET,
350                                                     bucket.getIndex())
351                                         .queryParam(QP_SESSION,
352                                                     encodedSessionName)
353                                         .queryParam(QP_COUNT, count)
354                                         .queryParam(QP_DEST, server.getUuid())
355                                         .request().post(entity);
356                                     }
357                                 });
358                             }
359                         }
360                     }
361                 }
362             } catch (Exception e) {
363                 logger.error("Persistence.PersistenceRunnable.run:", e);
364             }
365         }
366
367         /****************************************/
368         /* 'RuleRuntimeEventListener' interface */
369         /****************************************/
370
371         /**
372          * {@inheritDoc}
373          */
374         @Override
375         public void objectDeleted(ObjectDeletedEvent event) {
376             // determine Drools object that was deleted
377             Object object = event.getOldObject();
378
379             // determine keyword, if any
380             String keyword = Keyword.lookupKeyword(object);
381             if (keyword == null) {
382                 // no keyword, so there is no associated bucket
383                 return;
384             }
385
386             // locate bucket and associated data
387             // (don't create adjunct if it isn't there -- there's nothing to delete)
388             Bucket bucket = Bucket.getBucket(keyword);
389             SenderBucketData sbd =
390                 bucket.getAdjunctDontCreate(SenderBucketData.class);
391             if (sbd != null) {
392                 // add bucket to 'modified' list
393                 synchronized (modifiedBuckets) {
394                     modifiedBuckets.add(bucket);
395                 }
396
397                 // update set of Drools objects in this bucket
398                 sbd.getSessionData(session).objectDeleted(object);
399
400                 // insert this 'DroolsRunnable' to do the backup (note that it
401                 // may already be inserted from a previous update to this
402                 // DroolsSession -- eventually, the rule will fire, and the 'run'
403                 // method will be called)
404                 session.getKieSession().insert(this);
405             }
406         }
407
408         /**
409          * {@inheritDoc}
410          */
411         @Override
412         public void objectInserted(ObjectInsertedEvent event) {
413             objectChanged(event.getObject());
414         }
415
416         /**
417          * {@inheritDoc}
418          */
419         @Override
420         public void objectUpdated(ObjectUpdatedEvent event) {
421             objectChanged(event.getObject());
422         }
423
424         /**
425          * A Drools session object was either inserted or updated
426          * (both are treated the same way).
427          *
428          * @param object the object being inserted or updated
429          */
430         private void objectChanged(Object object) {
431             // determine keyword, if any
432             String keyword = Keyword.lookupKeyword(object);
433             if (keyword == null) {
434                 // no keyword, so there is no associated bucket
435                 return;
436             }
437
438             // locate bucket and associated data
439             Bucket bucket = Bucket.getBucket(keyword);
440             SenderBucketData sbd = bucket.getAdjunct(SenderBucketData.class);
441
442             // add bucket to 'modified' list
443             synchronized (modifiedBuckets) {
444                 modifiedBuckets.add(bucket);
445             }
446
447             // update set of Drools objects in this bucket
448             sbd.getSessionData(session).objectChanged(object);
449
450             // insert this 'DroolsRunnable' to do the backup (note that it
451             // may already be inserted from a previous update to this
452             // DroolsSession -- eventually, the rule will fire, and the 'run'
453             // method will be called)
454             session.getKieSession().insert(this);
455         }
456     }
457
458     /* ============================================================ */
459
460     /**
461      * Per-session data for a single bucket on the sender's side.
462      */
463     static class SenderSessionBucketData {
464         // the set of all objects in the session associated with this bucket
465         Map<Object,Object> droolsObjects = new IdentityHashMap<>();
466
467         // used by the receiver to determine whether an update is really newer
468         int count = 0;
469
470         // serialized base64 form of 'droolsObjects'
471         // (TBD: determine if we are getting any benefit from caching this)
472         byte[] encodedSerializedData = null;
473
474         // 'true' means that 'encodedSerializedData' is out-of-date
475         boolean rebuildNeeded = true;
476
477         /**
478          * Notification that a Drools object associated with this bucket
479          * was deleted.
480          *
481          * @param object the object that was deleted
482          */
483         synchronized void objectDeleted(Object object) {
484             if (droolsObjects.remove(object) != null) {
485                 rebuildNeeded = true;
486             }
487         }
488
489         /**
490          * Notification that a Drools object associated with this bucket
491          * was inserted or updated.
492          *
493          * @param object the object that was updated
494          */
495         synchronized void objectChanged(Object object) {
496             droolsObjects.put(object, object);
497             rebuildNeeded = true;
498         }
499
500         /**
501          * Serialize and base64-encode the objects in this Drools session.
502          *
503          * @return a byte array containing the encoded serialized objects
504          */
505         synchronized byte[] getLatestEncodedSerializedData() {
506             if (rebuildNeeded) {
507                 try {
508                     // this should be run in the Drools session thread in order
509                     // to avoid transient data
510                     encodedSerializedData =
511                         Base64.getEncoder().encode(Util.serialize(droolsObjects));
512                     count += 1;
513                 } catch (IOException e) {
514                     logger.error("Persistence.SenderSessionBucketData."
515                                  + "getLatestEncodedSerializedData: ", e);
516                 }
517                 rebuildNeeded = false;
518             }
519             return encodedSerializedData;
520         }
521
522         /**
523          * Return a counter that will be used for update comparison.
524          *
525          * @return the value of a counter that can be used to determine whether
526          *     an update is really newer than the previous update
527          */
528         synchronized int getCount() {
529             return count;
530         }
531     }
532
533     /* ============================================================ */
534
535     /**
536      * Data for a single bucket on the sender's side.
537      */
538     public static class SenderBucketData {
539         // maps session name into SenderSessionBucketData
540         Map<PolicySession, SenderSessionBucketData> sessionData =
541             new IdentityHashMap<>();
542
543         // used by the receiver to determine whether an update is really newer
544         int lockCount = 0;
545
546         /**
547          * Create or fetch the 'SenderSessionBucketData' instance associated
548          * with the specified session.
549          *
550          * @param session the 'PolicySession' object
551          * @return the associated 'SenderSessionBucketData' instance
552          */
553         synchronized SenderSessionBucketData getSessionData(PolicySession session) {
554             return sessionData.computeIfAbsent(session, key -> new SenderSessionBucketData());
555         }
556
557         /**
558          * Return a counter that will be used for update comparison.
559          *
560          * @return the value of a counter that can be used to determine whether
561          *     an update is really newer than the previous update
562          */
563         int getLockCountAndIncrement() {
564             // note that this is synchronized using the 'GlobalLocks' instance
565             // within the same bucket
566             return lockCount++;
567         }
568     }
569
570     /* ============================================================ */
571
572     /**
573      * Data for a single bucket and session on the receiver's side.
574      */
575     static class ReceiverSessionBucketData {
576         // used to determine whether an update is really newer
577         int count = -1;
578
579         // serialized base64 form of 'droolsObjects'
580         byte[] encodedSerializedData = null;
581     }
582
583     /* ============================================================ */
584
585     /**
586      * Data for a single bucket on the receiver's side -- this adjunct is used
587      * to store encoded data on a backup host. It will only be needed if the
588      * bucket owner fails.
589      */
590     public static class ReceiverBucketData {
591         static final String RESTORE_BUCKET_ERROR =
592             "Persistence.ReceiverBucketData.restoreBucket: ";
593
594         // maps session name into encoded data
595         Map<String, ReceiverSessionBucketData> sessionData = new HashMap<>();
596
597         // used by the receiver to determine whether an update is really newer
598         int lockCount = -1;
599
600         // encoded lock data
601         byte[] lockData = null;
602
603         /**
604          * This method is called in response to the '/persistence/session'
605          * REST message. It stores the base64-encoded and serialized data
606          * for a particular bucket and session.
607          *
608          * @param bucketNumber identifies the bucket
609          * @param sessionName identifies the Drools session
610          * @param count counter used to determine whether data is really newer
611          * @param data base64-encoded serialized data for this bucket and session
612          */
613         static void receiveSession(int bucketNumber, String sessionName, int count, byte[] data) {
614             // fetch the bucket
615             Bucket bucket = Bucket.getBucket(bucketNumber);
616
617             // create/fetch the 'ReceiverBucketData' adjunct
618             ReceiverBucketData rbd = bucket.getAdjunct(ReceiverBucketData.class);
619             synchronized (rbd) {
620                 // update the session data
621                 ReceiverSessionBucketData rsbd = rbd.sessionData.get(sessionName);
622                 if (rsbd == null) {
623                     rsbd = new ReceiverSessionBucketData();
624                     rbd.sessionData.put(sessionName, rsbd);
625                 }
626
627                 if ((count - rsbd.count) > 0 || count == 0) {
628                     // this is new data
629                     rsbd.count = count;
630                     rsbd.encodedSerializedData = data;
631                 }
632             }
633         }
634
635         /**
636          * This method is called in response to the '/persistence/lock'
637          * REST message. It stores the base64-encoded and serialized
638          * server-side lock data associated with this bucket.
639          *
640          * @param bucketNumber identifies the bucket
641          * @param count counter used to determine whether data is really newer
642          * @param data base64-encoded serialized lock data for this bucket
643          */
644         static void receiveLockData(int bucketNumber, int count, byte[] data) {
645             // fetch the bucket
646             Bucket bucket = Bucket.getBucket(bucketNumber);
647
648             // create/fetch the 'ReceiverBucketData' adjunct
649             ReceiverBucketData rbd = bucket.getAdjunct(ReceiverBucketData.class);
650             synchronized (rbd) {
651                 // update the lock data
652                 if ((count - rbd.lockCount) > 0 || count == 0) {
653                     rbd.lockCount = count;
654                     rbd.lockData = data;
655                 }
656             }
657         }
658
659         /**
660          * This method is called when a bucket is being restored from persistent
661          * data, which indicates that a clean migration didn't occur.
662          * Drools session and/or lock data is restored.
663          *
664          * @param bucket the bucket being restored
665          */
666         synchronized void restoreBucket(Bucket bucket) {
667             // one entry for each Drools session being restored --
668             // indicates when the restore is complete (restore runs within
669             // the Drools session thread)
670             List<CountDownLatch> sessionLatches = restoreBucket_droolsSessions();
671
672             // restore lock data
673             restoreBucket_locks(bucket);
674
675             // wait for all of the sessions to update
676             try {
677                 for (CountDownLatch sessionLatch : sessionLatches) {
678                     if (!sessionLatch.await(10000L, TimeUnit.MILLISECONDS)) {
679                         logger.error("{}: timed out waiting for session latch",
680                                      this);
681                     }
682                 }
683             } catch (InterruptedException e) {
684                 logger.error("Exception in {}", this, e);
685                 Thread.currentThread().interrupt();
686             }
687         }
688
689         private List<CountDownLatch> restoreBucket_droolsSessions() {
690             List<CountDownLatch> sessionLatches = new LinkedList<>();
691             for (Map.Entry<String, ReceiverSessionBucketData> entry : sessionData.entrySet()) {
692                 String sessionName = entry.getKey();
693                 ReceiverSessionBucketData rsbd = entry.getValue();
694
695                 // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
696                 String[] nameSegments = sessionName.split(":");
697                 PolicySession policySession = null;
698
699                 // locate the 'PolicyContainer' and 'PolicySession'
700                 if (nameSegments.length == 3) {
701                     // step through all 'PolicyContainer' instances looking
702                     // for a matching 'artifactId' & 'groupId'
703                     for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
704                         if (nameSegments[1].equals(pc.getArtifactId())
705                                 && nameSegments[0].equals(pc.getGroupId())) {
706                             // 'PolicyContainer' matches -- try to fetch the session
707                             policySession = pc.getPolicySession(nameSegments[2]);
708                             break;
709                         }
710                     }
711                 }
712
713                 if (policySession == null) {
714                     logger.error(RESTORE_BUCKET_ERROR
715                                  + "Can't find PolicySession{}", sessionName);
716                     continue;
717                 }
718
719                 Object obj = null;
720                 try {
721                     // deserialization needs to use the correct 'ClassLoader'
722                     obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
723                         policySession.getPolicyContainer().getClassLoader());
724                 } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
725                     logger.error(RESTORE_BUCKET_ERROR
726                                  + "Failed to read data for session '{}'",
727                                  sessionName, e);
728
729                     // can't decode -- skip this session
730                     continue;
731                 }
732
733                 if (!(obj instanceof Map)) {
734                     logger.error(RESTORE_BUCKET_ERROR
735                                  + "Session '{}' data has class {}, expected 'Map'",
736                                  sessionName, obj.getClass().getName());
737
738                     // wrong object type decoded -- skip this session
739                     continue;
740                 }
741
742                 // if we reach this point, we have decoded the persistent data
743
744                 final Map<?,?> droolsObjects = (Map<?,?>) obj;
745
746                 // signal when restore is complete
747                 final CountDownLatch sessionLatch = new CountDownLatch(1);
748
749                 // 'KieSession' object
750                 final KieSession kieSession = policySession.getKieSession();
751
752                 // run the following within the Drools session thread
753                 DroolsRunnable insertDroolsObjects = () -> {
754                     try {
755                         // insert all of the Drools objects into the session
756                         for (Object droolsObj : droolsObjects.keySet()) {
757                             kieSession.insert(droolsObj);
758                         }
759                     } finally {
760                         // signal completion
761                         sessionLatch.countDown();
762                     }
763                 };
764                 kieSession.insert(insertDroolsObjects);
765
766                 // add this to the set of 'CountDownLatch's we are waiting for
767                 sessionLatches.add(sessionLatch);
768             }
769             return sessionLatches;
770         }
771
772         private void restoreBucket_locks(Bucket bucket) {
773             if (lockData != null) {
774                 Object obj = null;
775                 try {
776                     // decode lock data
777                     obj = Util.deserialize(Base64.getDecoder().decode(lockData));
778                     if (obj instanceof GlobalLocks) {
779                         bucket.putAdjunct(obj);
780
781                         // send out updated date
782                         sendLockDataToBackups(bucket, (GlobalLocks)obj);
783                     } else {
784                         logger.error(RESTORE_BUCKET_ERROR
785                                      + "Expected 'GlobalLocks', got '{}'",
786                                      obj.getClass().getName());
787                     }
788                 } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
789                     logger.error(RESTORE_BUCKET_ERROR
790                                  + "Failed to read lock data", e);
791                     // skip the lock data
792                 }
793
794             }
795         }
796     }
797
798     /* ============================================================ */
799
800     @Path("/")
801     public static class Rest {
802         /**
803          * Handle the '/persistence/session' REST call.
804          */
805         @POST
806         @Path("/persistence/session")
807         @Consumes(MediaType.APPLICATION_OCTET_STREAM)
808         public void receiveSession(@QueryParam(QP_BUCKET) int bucket,
809                                    @QueryParam(QP_SESSION) String sessionName,
810                                    @QueryParam(QP_COUNT) int count,
811                                    @QueryParam(QP_DEST) UUID dest,
812                                    byte[] data) {
813             logger.debug("/persistence/session: (bucket={},session={},count={}) "
814                          + "got {} bytes of data",
815                          bucket, sessionName, count, data.length);
816             if (dest == null || dest.equals(Server.getThisServer().getUuid())) {
817                 ReceiverBucketData.receiveSession(bucket, sessionName, count, data);
818             } else {
819                 // This host is not the intended destination -- this could happen
820                 // if it was sent from another site. Leave off the 'dest' param
821                 // when forwarding the message, to ensure that we don't have
822                 // an infinite forwarding loop, if the site data happens to be bad.
823                 Server server;
824                 WebTarget webTarget;
825
826                 if ((server = Server.getServer(dest)) != null
827                         && (webTarget =
828                                 server.getWebTarget("persistence/session")) != null) {
829                     Entity<String> entity =
830                         Entity.entity(new String(data),
831                                       MediaType.APPLICATION_OCTET_STREAM_TYPE);
832                     webTarget
833                     .queryParam(QP_BUCKET, bucket)
834                     .queryParam(QP_SESSION, sessionName)
835                     .queryParam(QP_COUNT, count)
836                     .request().post(entity);
837                 }
838             }
839         }
840
841         /**
842          * Handle the '/persistence/lock' REST call.
843          */
844         @POST
845         @Path("/persistence/lock")
846         @Consumes(MediaType.APPLICATION_OCTET_STREAM)
847         public void receiveLockData(@QueryParam(QP_BUCKET) int bucket,
848                                     @QueryParam(QP_COUNT) int count,
849                                     @QueryParam(QP_DEST) UUID dest,
850                                     byte[] data) {
851             logger.debug("/persistence/lock: (bucket={},count={}) "
852                          + "got {} bytes of data", bucket, count, data.length);
853             if (dest == null || dest.equals(Server.getThisServer().getUuid())) {
854                 ReceiverBucketData.receiveLockData(bucket, count, data);
855             } else {
856                 // This host is not the intended destination -- this could happen
857                 // if it was sent from another site. Leave off the 'dest' param
858                 // when forwarding the message, to ensure that we don't have
859                 // an infinite forwarding loop, if the site data happens to be bad.
860                 Server server;
861                 WebTarget webTarget;
862
863                 if ((server = Server.getServer(dest)) != null
864                         && (webTarget = server.getWebTarget("persistence/lock")) != null) {
865                     Entity<String> entity =
866                         Entity.entity(new String(data),
867                                       MediaType.APPLICATION_OCTET_STREAM_TYPE);
868                     webTarget
869                     .queryParam(QP_BUCKET, bucket)
870                     .queryParam(QP_COUNT, count)
871                     .request().post(entity);
872                 }
873             }
874         }
875     }
876 }