Release DR images via self release
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning;
26
27 import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
28
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31
32 import java.io.ByteArrayOutputStream;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.InputStream;
36 import java.net.InetAddress;
37 import java.net.UnknownHostException;
38 import java.nio.file.Files;
39 import java.nio.file.Path;
40 import java.nio.file.Paths;
41 import java.nio.file.StandardCopyOption;
42 import java.security.KeyStore;
43 import java.sql.Connection;
44 import java.sql.SQLException;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.Collection;
48 import java.util.HashMap;
49 import java.util.Map;
50 import java.util.Properties;
51 import java.util.Set;
52 import java.util.Timer;
53 import java.util.TimerTask;
54 import java.util.TreeSet;
55
56 import javax.servlet.http.HttpServletResponse;
57
58 import org.apache.http.HttpEntity;
59 import org.apache.http.HttpResponse;
60 import org.apache.http.client.methods.HttpGet;
61 import org.apache.http.client.methods.HttpPost;
62 import org.apache.http.conn.scheme.Scheme;
63 import org.apache.http.conn.ssl.SSLSocketFactory;
64 import org.apache.http.entity.ByteArrayEntity;
65 import org.apache.http.entity.ContentType;
66 import org.apache.http.impl.client.AbstractHttpClient;
67 import org.apache.http.impl.client.DefaultHttpClient;
68 import org.json.JSONArray;
69 import org.json.JSONException;
70 import org.json.JSONObject;
71 import org.json.JSONTokener;
72 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
73 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
74 import org.onap.dmaap.datarouter.provisioning.beans.Group;
75 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
76 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
77 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
78 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
79 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
80 import org.onap.dmaap.datarouter.provisioning.utils.AafPropsUtils;
81 import org.onap.dmaap.datarouter.provisioning.utils.DB;
82 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
83 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
84 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
85
86 /**
87  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
88  * <ol>
89  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
90  * the active (master) POD.</li>
91  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
92  * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
93  * of this POD.</li>
94  * </ol>
95  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
96  * <code>
97  * Security.setProperty("networkaddress.cache.ttl", "10");
98  * </code>
99  *
100  * @author Robert Eby
101  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
102  */
103
104 public class SynchronizerTask extends TimerTask {
105
106     /**
107      * This is a singleton -- there is only one SynchronizerTask object in the server.
108      */
109     private static SynchronizerTask synctask;
110
111     /**
112      * This POD is unknown -- not on the list of PODs.
113      */
114     public static final int UNKNOWN_POD = 0;
115     /**
116      * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
117      */
118     public static final int ACTIVE_POD = 1;
119     /**
120      * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
121      */
122     public static final int STANDBY_POD = 2;
123
124     private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
125     private static final long ONE_HOUR = 60 * 60 * 1000L;
126
127     private long nextMsg = 0;    // only display the "Current podState" msg every 5 mins.
128
129     private final EELFLogger logger;
130     private final Timer rolex;
131     private final String spooldir;
132     private int podState;
133     private boolean doFetch;
134     private long nextsynctime;
135     private AbstractHttpClient httpclient = null;
136
137     @SuppressWarnings("deprecation")
138     private SynchronizerTask() {
139         logger = EELFManager.getInstance().getLogger("InternalLog");
140         rolex = new Timer();
141         spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
142         podState = UNKNOWN_POD;
143         doFetch = true;        // start off with a fetch
144         nextsynctime = 0;
145
146         logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
147         try {
148             // Set up keystore
149             String type = AafPropsUtils.KEYSTORE_TYPE_PROPERTY;
150             String store = Main.aafPropsUtils.getKeystorePathProperty();
151             String pass = Main.aafPropsUtils.getKeystorePassProperty();
152             KeyStore keyStore = KeyStore.getInstance(type);
153             try (FileInputStream instream = new FileInputStream(new File(store))) {
154                 keyStore.load(instream, pass.toCharArray());
155
156             }
157             // Set up truststore
158             store = Main.aafPropsUtils.getTruststorePathProperty();
159             pass = Main.aafPropsUtils.getTruststorePassProperty();
160             KeyStore trustStore = null;
161             if (store != null && store.length() > 0) {
162                 trustStore = KeyStore.getInstance(AafPropsUtils.TRUESTSTORE_TYPE_PROPERTY);
163                 try (FileInputStream instream = new FileInputStream(new File(store))) {
164                     trustStore.load(instream, pass.toCharArray());
165
166                 }
167             }
168
169             // We are connecting with the node name, but the certificate will have the CNAME
170             // So we need to accept a non-matching certificate name
171             String keystorepass = Main.aafPropsUtils.getKeystorePassProperty();
172             try (AbstractHttpClient hc = new DefaultHttpClient()) {
173                 SSLSocketFactory socketFactory =
174                         (trustStore == null)
175                                 ? new SSLSocketFactory(keyStore, keystorepass)
176                                 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
177                 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
178                 Scheme sch = new Scheme("https", 443, socketFactory);
179                 hc.getConnectionManager().getSchemeRegistry().register(sch);
180                 httpclient = hc;
181             }
182             setSynchTimer(new DB().getProperties().getProperty(
183                 "org.onap.dmaap.datarouter.provserver.sync_interval", "5000"));
184         } catch (Exception e) {
185             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
186         }
187     }
188
189     private void setSynchTimer(String strInterval) {
190         // Run once every 5 seconds to check DNS, etc.
191         long interval;
192         try {
193             interval = Long.parseLong(strInterval);
194         } catch (NumberFormatException e) {
195             interval = 5000L;
196         }
197         rolex.scheduleAtFixedRate(this, 0L, interval);
198     }
199
200     /**
201      * Get the singleton SynchronizerTask object.
202      *
203      * @return the SynchronizerTask
204      */
205     public static synchronized SynchronizerTask getSynchronizer() {
206         if (synctask == null) {
207             synctask = new SynchronizerTask();
208         }
209         return synctask;
210     }
211
212     /**
213      * What is the podState of this POD?.
214      *
215      * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
216      */
217     public int getPodState() {
218         return podState;
219     }
220
221     /**
222      * Is this the active POD?.
223      *
224      * @return true if we are active (the master), false otherwise
225      */
226     public boolean isActive() {
227         return podState == ACTIVE_POD;
228     }
229
230     /**
231      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
232      * should re-synchronize with the master.
233      */
234     void doFetch() {
235         doFetch = true;
236     }
237
238     /**
239      * Runs once a minute in order to <ol>
240      * <li>lookup DNS names,</li>
241      * <li>determine the podState of this POD,</li>
242      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
243      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
244      * </ol>.
245      */
246     @Override
247     public void run() {
248         try {
249             podState = lookupState();
250             if (podState == STANDBY_POD) {
251                 // Only copy provisioning data FROM the active server TO the standby
252                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
253                     syncProvisioningData();
254                     logger.info("PROV5013: Sync completed.");
255                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
256                 }
257             } else {
258                 // Don't do fetches on non-standby PODs
259                 doFetch = false;
260             }
261
262             // Fetch DR logs as needed - server to server
263             LogfileLoader lfl = LogfileLoader.getLoader();
264             if (lfl.isIdle()) {
265                 // Only fetch new logs if the loader is waiting for them.
266                 logger.trace("Checking for logs to replicate...");
267                 RLEBitSet local = lfl.getBitSet();
268                 RLEBitSet remote = readRemoteLoglist();
269                 remote.andNot(local);
270                 if (!remote.isEmpty()) {
271                     logger.debug(" Replicating logs: " + remote);
272                     replicateDataRouterLogs(remote);
273                 }
274             }
275         } catch (Exception e) {
276             logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
277         }
278     }
279
280     private void syncProvisioningData() {
281         logger.debug("Initiating a sync...");
282         JSONObject jo = readProvisioningJson();
283         if (jo != null) {
284             doFetch = false;
285             syncFeeds(jo.getJSONArray("feeds"));
286             syncSubs(jo.getJSONArray("subscriptions"));
287             syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
288             syncParams(jo.getJSONObject("parameters"));
289             // The following will not be present in a version=1.0 provfeed
290             JSONArray ja = jo.optJSONArray("ingress");
291             if (ja != null) {
292                 syncIngressRoutes(ja);
293             }
294             JSONObject j2 = jo.optJSONObject("egress");
295             if (j2 != null) {
296                 syncEgressRoutes(j2);
297             }
298             ja = jo.optJSONArray("routing");
299             if (ja != null) {
300                 syncNetworkRoutes(ja);
301             }
302         }
303     }
304
305     /**
306      * This method is used to lookup the CNAME that points to the active server.
307      * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
308      *
309      * @return the current podState
310      */
311     int lookupState() {
312         int newPodState = UNKNOWN_POD;
313         try {
314             InetAddress myaddr = InetAddress.getLocalHost();
315             if (logger.isTraceEnabled()) {
316                 logger.trace("My address: " + myaddr);
317             }
318             String thisPod = myaddr.getHostName();
319             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
320             if (pods.contains(thisPod)) {
321                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
322                 newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
323                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
324                     logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
325                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
326                 }
327             } else {
328                 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
329             }
330         } catch (UnknownHostException e) {
331             logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
332         }
333
334         if (newPodState != podState) {
335             logger.info(String.format("PROV5001: Server podState changed from %s to %s",
336                     stnames[podState], stnames[newPodState]));
337         }
338         return newPodState;
339     }
340
341     /**
342      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
343      */
344     private void syncFeeds(JSONArray ja) {
345         Collection<Syncable> coll = new ArrayList<>();
346         for (int n = 0; n < ja.length(); n++) {
347             try {
348                 Feed feed = new Feed(ja.getJSONObject(n));
349                 coll.add(feed);
350             } catch (Exception e) {
351                 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
352             }
353         }
354         if (sync(coll, Feed.getAllFeeds())) {
355             BaseServlet.provisioningDataChanged();
356         }
357     }
358
359     /**
360      * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
361      */
362     private void syncSubs(JSONArray ja) {
363         Collection<Syncable> coll = new ArrayList<>();
364         for (int n = 0; n < ja.length(); n++) {
365             try {
366                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
367                 JSONObject jsonObject = ja.getJSONObject(n);
368                 jsonObject.put("sync", "true");
369                 Subscription sub = new Subscription(jsonObject);
370                 coll.add(sub);
371             } catch (Exception e) {
372                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
373             }
374         }
375         if (sync(coll, Subscription.getAllSubscriptions())) {
376             BaseServlet.provisioningDataChanged();
377         }
378     }
379
380     /**
381      * Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB.
382      */
383     private void syncGroups(JSONArray ja) {
384         Collection<Syncable> coll = new ArrayList<>();
385         for (int n = 0; n < ja.length(); n++) {
386             try {
387                 Group group = new Group(ja.getJSONObject(n));
388                 coll.add(group);
389             } catch (Exception e) {
390                 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
391             }
392         }
393         if (sync(coll, Group.getAllgroups())) {
394             BaseServlet.provisioningDataChanged();
395         }
396     }
397
398
399     /**
400      * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
401      */
402     private void syncParams(JSONObject jo) {
403         Collection<Syncable> coll = new ArrayList<>();
404         for (String k : jo.keySet()) {
405             String val = "";
406             try {
407                 val = jo.getString(k);
408             } catch (JSONException e) {
409                 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
410                 try {
411                     val = "" + jo.getInt(k);
412                 } catch (JSONException e1) {
413                     logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
414                     JSONArray ja = jo.getJSONArray(k);
415                     for (int i = 0; i < ja.length(); i++) {
416                         if (i > 0) {
417                             val += "|";
418                         }
419                         val += ja.getString(i);
420                     }
421                 }
422             }
423             coll.add(new Parameters(k, val));
424         }
425         if (sync(coll, Parameters.getParameterCollection())) {
426             BaseServlet.provisioningDataChanged();
427             BaseServlet.provisioningParametersChanged();
428         }
429     }
430
431     private void syncIngressRoutes(JSONArray ja) {
432         Collection<Syncable> coll = new ArrayList<>();
433         for (int n = 0; n < ja.length(); n++) {
434             try {
435                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
436                 coll.add(in);
437             } catch (NumberFormatException e) {
438                 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
439             }
440         }
441         if (sync(coll, IngressRoute.getAllIngressRoutes())) {
442             BaseServlet.provisioningDataChanged();
443         }
444     }
445
446     private void syncEgressRoutes(JSONObject jo) {
447         Collection<Syncable> coll = new ArrayList<>();
448         for (String key : jo.keySet()) {
449             try {
450                 int sub = Integer.parseInt(key);
451                 String node = jo.getString(key);
452                 EgressRoute er = new EgressRoute(sub, node);
453                 coll.add(er);
454             } catch (NumberFormatException e) {
455                 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
456             } catch (IllegalArgumentException e) {
457                 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
458             }
459         }
460         if (sync(coll, EgressRoute.getAllEgressRoutes())) {
461             BaseServlet.provisioningDataChanged();
462         }
463     }
464
465     private void syncNetworkRoutes(JSONArray ja) {
466         Collection<Syncable> coll = new ArrayList<>();
467         for (int n = 0; n < ja.length(); n++) {
468             try {
469                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
470                 coll.add(nr);
471             } catch (JSONException e) {
472                 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
473             }
474         }
475         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
476             BaseServlet.provisioningDataChanged();
477         }
478     }
479
480     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
481         boolean changes = false;
482         try {
483             Map<String, Syncable> newmap = getMap(newc);
484             Map<String, Syncable> oldmap = getMap(oldc);
485             Set<String> union = new TreeSet<>(newmap.keySet());
486             union.addAll(oldmap.keySet());
487             DB db = new DB();
488             @SuppressWarnings("resource")
489             Connection conn = db.getConnection();
490             for (String n : union) {
491                 Syncable newobj = newmap.get(n);
492                 Syncable oldobj = oldmap.get(n);
493                 if (oldobj == null) {
494                     changes = insertRecord(conn, newobj);
495                 } else if (newobj == null) {
496                     changes = deleteRecord(conn, oldobj);
497                 } else if (!newobj.equals(oldobj)) {
498                     changes = updateRecord(conn, newobj, oldobj);
499                 }
500             }
501             db.release(conn);
502         } catch (SQLException e) {
503             logger.warn("PROV5009: problem during sync, exception: " + e);
504         }
505         return changes;
506     }
507
508     private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
509         if (logger.isDebugEnabled()) {
510             logger.debug("  Updating record: " + newobj);
511         }
512         boolean changes = newobj.doUpdate(conn);
513         checkChangeOwner(newobj, oldobj);
514
515         return changes;
516     }
517
518     private boolean deleteRecord(Connection conn, Syncable oldobj) {
519         if (logger.isDebugEnabled()) {
520             logger.debug("  Deleting record: " + oldobj);
521         }
522         return oldobj.doDelete(conn);
523     }
524
525     private boolean insertRecord(Connection conn, Syncable newobj) {
526         if (logger.isDebugEnabled()) {
527             logger.debug("  Inserting record: " + newobj);
528         }
529         return newobj.doInsert(conn);
530     }
531
532     private Map<String, Syncable> getMap(Collection<? extends Syncable> coll) {
533         Map<String, Syncable> map = new HashMap<>();
534         for (Syncable v : coll) {
535             map.put(v.getKey(), v);
536         }
537         return map;
538     }
539
540     /**
541      * Change owner of FEED/SUBSCRIPTION.
542      * Rally US708115 Change Ownership of FEED - 1610
543      */
544     private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
545         if (newobj instanceof Feed) {
546             Feed oldfeed = (Feed) oldobj;
547             Feed newfeed = (Feed) newobj;
548
549             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
550                 logger.info("PROV5013 -  Previous publisher: "
551                                     + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
552                 oldfeed.setPublisher(newfeed.getPublisher());
553                 oldfeed.changeOwnerShip();
554             }
555         } else if (newobj instanceof Subscription) {
556             Subscription oldsub = (Subscription) oldobj;
557             Subscription newsub = (Subscription) newobj;
558
559             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
560                 logger.info("PROV5013 -  Previous subscriber: "
561                                     + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
562                 oldsub.setSubscriber(newsub.getSubscriber());
563                 oldsub.changeOwnerShip();
564             }
565         }
566
567     }
568
569     /**
570      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
571      *
572      * @return the provisioning data (as a JONObject)
573      */
574     private synchronized JSONObject readProvisioningJson() {
575         String url = URLUtilities.generatePeerProvURL();
576         HttpGet get = new HttpGet(url);
577         try {
578             HttpResponse response = httpclient.execute(get);
579             int code = response.getStatusLine().getStatusCode();
580             if (code != HttpServletResponse.SC_OK) {
581                 logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
582                 return null;
583             }
584             HttpEntity entity = response.getEntity();
585             String ctype = entity.getContentType().getValue().trim();
586             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
587                         && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
588                 logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
589                 return null;
590             }
591             return new JSONObject(new JSONTokener(entity.getContent()));
592         } catch (Exception e) {
593             logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
594             return null;
595         } finally {
596             get.releaseConnection();
597         }
598     }
599
600     /**
601      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
602      * the remote database.
603      *
604      * @return the bitset
605      */
606     RLEBitSet readRemoteLoglist() {
607         RLEBitSet bs = new RLEBitSet();
608         String url = URLUtilities.generatePeerLogsURL();
609
610         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
611         if ("".equals(url)) {
612             return bs;
613         }
614         //End of fix.
615
616         HttpGet get = new HttpGet(url);
617         try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
618             HttpResponse response = httpclient.execute(get);
619             int code = response.getStatusLine().getStatusCode();
620             if (code != HttpServletResponse.SC_OK) {
621                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
622                 return bs;
623             }
624             HttpEntity entity = response.getEntity();
625             String ctype = entity.getContentType().getValue().trim();
626             if (!TEXT_CT.equals(ctype)) {
627                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
628                 return bs;
629             }
630             InputStream is = entity.getContent();
631             int ch;
632             while ((ch = is.read()) >= 0) {
633                 bos.write(ch);
634             }
635             bs.set(bos.toString());
636             is.close();
637         } catch (Exception e) {
638             logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
639             return bs;
640         } finally {
641             get.releaseConnection();
642         }
643         return bs;
644     }
645
646     /**
647      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
648      * we wish to copy to the local database.
649      *
650      * @param bs the bitset (an RELBitSet) of log records to fetch
651      */
652     void replicateDataRouterLogs(RLEBitSet bs) {
653         String url = URLUtilities.generatePeerLogsURL();
654         HttpPost post = new HttpPost(url);
655         try {
656             String str = bs.toString();
657             HttpEntity body = new ByteArrayEntity(str.getBytes(), ContentType.create(TEXT_CT));
658             post.setEntity(body);
659             if (logger.isDebugEnabled()) {
660                 logger.debug("Requesting records: " + str);
661             }
662
663             HttpResponse response = httpclient.execute(post);
664             int code = response.getStatusLine().getStatusCode();
665             if (code != HttpServletResponse.SC_OK) {
666                 logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
667                 return;
668             }
669             HttpEntity entity = response.getEntity();
670             String ctype = entity.getContentType().getValue().trim();
671             if (!TEXT_CT.equals(ctype)) {
672                 logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
673                 return;
674             }
675
676             String spoolname = "" + System.currentTimeMillis();
677             Path tmppath = Paths.get(spooldir, spoolname);
678             Path donepath = Paths.get(spooldir, "IN." + spoolname);
679             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
680             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
681             logger.info("Approximately " + bs.cardinality() + " records replicated.");
682         } catch (Exception e) {
683             logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
684         } finally {
685             post.releaseConnection();
686         }
687     }
688 }