Remove major and minor code smells in dr-prov
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.InputStream;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.nio.file.Files;
34 import java.nio.file.Path;
35 import java.nio.file.Paths;
36 import java.nio.file.StandardCopyOption;
37 import java.security.KeyStore;
38 import java.sql.Connection;
39 import java.sql.SQLException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.HashMap;
44 import java.util.Map;
45 import java.util.Properties;
46 import java.util.Set;
47 import java.util.Timer;
48 import java.util.TimerTask;
49 import java.util.TreeSet;
50
51 import javax.servlet.http.HttpServletResponse;
52
53 import com.att.eelf.configuration.EELFLogger;
54 import com.att.eelf.configuration.EELFManager;
55 import org.apache.http.HttpEntity;
56 import org.apache.http.HttpResponse;
57 import org.apache.http.client.methods.HttpGet;
58 import org.apache.http.client.methods.HttpPost;
59 import org.apache.http.conn.scheme.Scheme;
60 import org.apache.http.conn.ssl.SSLSocketFactory;
61 import org.apache.http.entity.ByteArrayEntity;
62 import org.apache.http.entity.ContentType;
63 import org.apache.http.impl.client.AbstractHttpClient;
64 import org.apache.http.impl.client.DefaultHttpClient;
65 import org.json.JSONArray;
66 import org.json.JSONException;
67 import org.json.JSONObject;
68 import org.json.JSONTokener;
69 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
70 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
71 import org.onap.dmaap.datarouter.provisioning.beans.Group;
72 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
73 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
74 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
75 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
76 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
77 import org.onap.dmaap.datarouter.provisioning.utils.DB;
78 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
79 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
80 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
81
82 /**
83  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
84  * <ol>
85  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
86  * the active (master) POD.</li>
87  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
88  * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
89  * of this POD.</li>
90  * </ol>
91  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
92  * <code>
93  * Security.setProperty("networkaddress.cache.ttl", "10");
94  * </code>
95  *
96  * @author Robert Eby
97  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
98  */
99 public class SynchronizerTask extends TimerTask {
100
101     /**
102      * This is a singleton -- there is only one SynchronizerTask object in the server
103      */
104     private static SynchronizerTask synctask;
105
106     /**
107      * This POD is unknown -- not on the list of PODs
108      */
109     public static final int UNKNOWN = 0;
110     /**
111      * This POD is active -- on the list of PODs, and the DNS CNAME points to us
112      */
113     public static final int ACTIVE = 1;
114     /**
115      * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us
116      */
117     public static final int STANDBY = 2;
118     private static final String[] stnames = {"UNKNOWN", "ACTIVE", "STANDBY"};
119     private static final long ONE_HOUR = 60 * 60 * 1000L;
120
121     private final EELFLogger logger;
122     private final Timer rolex;
123     private final String spooldir;
124     private int state;
125     private boolean doFetch;
126     private long nextsynctime;
127     private AbstractHttpClient httpclient = null;
128
129     /**
130      * Get the singleton SynchronizerTask object.
131      *
132      * @return the SynchronizerTask
133      */
134     public static synchronized SynchronizerTask getSynchronizer() {
135         if (synctask == null) {
136             synctask = new SynchronizerTask();
137         }
138         return synctask;
139     }
140
141     @SuppressWarnings("deprecation")
142     private SynchronizerTask() {
143         logger = EELFManager.getInstance().getLogger("InternalLog");
144         rolex = new Timer();
145         spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
146         state = UNKNOWN;
147         doFetch = true;        // start off with a fetch
148         nextsynctime = 0;
149
150         logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
151         try {
152             Properties props = (new DB()).getProperties();
153             String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
154             String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
155             String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
156             KeyStore keyStore = KeyStore.getInstance(type);
157             try(FileInputStream instream = new FileInputStream(new File(store))) {
158                 keyStore.load(instream, pass.toCharArray());
159
160             }
161                 store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
162                 pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
163                 KeyStore trustStore = null;
164                 if (store != null && store.length() > 0) {
165                     trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
166                     try(FileInputStream instream = new FileInputStream(new File(store))){
167                         trustStore.load(instream, pass.toCharArray());
168
169                     }
170                 }
171
172             // We are connecting with the node name, but the certificate will have the CNAME
173             // So we need to accept a non-matching certificate name
174             String keystorepass = props.getProperty(
175                 Main.KEYSTORE_PASS_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
176            try(AbstractHttpClient hc = new DefaultHttpClient()) {
177                SSLSocketFactory socketFactory =
178                        (trustStore == null)
179                                ? new SSLSocketFactory(keyStore, keystorepass)
180                                : new SSLSocketFactory(keyStore, keystorepass, trustStore);
181                socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
182                Scheme sch = new Scheme("https", 443, socketFactory);
183                hc.getConnectionManager().getSchemeRegistry().register(sch);
184             httpclient = hc;
185            }
186             // Run once every 5 seconds to check DNS, etc.
187             long interval = 0;
188             try {
189                 String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
190                 interval = Long.parseLong(s);
191             } catch (NumberFormatException e) {
192                 interval = 5000L;
193             }
194             rolex.scheduleAtFixedRate(this, 0L, interval);
195         } catch (Exception e) {
196             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
197         }
198     }
199
200     /**
201      * What is the state of this POD?
202      *
203      * @return one of ACTIVE, STANDBY, UNKNOWN
204      */
205     public int getState() {
206         return state;
207     }
208
209     /**
210      * Is this the active POD?
211      *
212      * @return true if we are active (the master), false otherwise
213      */
214     public boolean isActive() {
215         return state == ACTIVE;
216     }
217
218     /**
219      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
220      * should re-synchronize with the master.
221      */
222     public void doFetch() {
223         doFetch = true;
224     }
225
226     /**
227      * Runs once a minute in order to <ol>
228      * <li>lookup DNS names,</li>
229      * <li>determine the state of this POD,</li>
230      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
231      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
232      * </ol>
233      */
234     @Override
235     public void run() {
236         try {
237             state = lookupState();
238             if (state == STANDBY) {
239                 // Only copy provisioning data FROM the active server TO the standby
240                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
241                     logger.debug("Initiating a sync...");
242                     JSONObject jo = readProvisioningJSON();
243                     if (jo != null) {
244                         doFetch = false;
245                         syncFeeds(jo.getJSONArray("feeds"));
246                         syncSubs(jo.getJSONArray("subscriptions"));
247                         syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
248                         syncParams(jo.getJSONObject("parameters"));
249                         // The following will not be present in a version=1.0 provfeed
250                         JSONArray ja = jo.optJSONArray("ingress");
251                         if (ja != null) {
252                             syncIngressRoutes(ja);
253                         }
254                         JSONObject j2 = jo.optJSONObject("egress");
255                         if (j2 != null) {
256                             syncEgressRoutes(j2);
257                         }
258                         ja = jo.optJSONArray("routing");
259                         if (ja != null) {
260                             syncNetworkRoutes(ja);
261                         }
262                     }
263                     logger.info("PROV5013: Sync completed.");
264                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
265                 }
266             } else {
267                 // Don't do fetches on non-standby PODs
268                 doFetch = false;
269             }
270
271             // Fetch DR logs as needed - server to server
272             LogfileLoader lfl = LogfileLoader.getLoader();
273             if (lfl.isIdle()) {
274                 // Only fetch new logs if the loader is waiting for them.
275                 logger.trace("Checking for logs to replicate...");
276                 RLEBitSet local = lfl.getBitSet();
277                 RLEBitSet remote = readRemoteLoglist();
278                 remote.andNot(local);
279                 if (!remote.isEmpty()) {
280                     logger.debug(" Replicating logs: " + remote);
281                     replicateDRLogs(remote);
282                 }
283             }
284         } catch (Exception e) {
285             logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
286         }
287     }
288
289     /**
290      * This method is used to lookup the CNAME that points to the active server. It returns 0 (UNKNOWN), 1(ACTIVE), or 2
291      * (STANDBY) to indicate the state of this server.
292      *
293      * @return the current state
294      */
295     private int lookupState() {
296         int newstate = UNKNOWN;
297         try {
298             InetAddress myaddr = InetAddress.getLocalHost();
299             if (logger.isTraceEnabled()) {
300                 logger.trace("My address: " + myaddr);
301             }
302             String thisPod = myaddr.getHostName();
303             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
304             if (pods.contains(thisPod)) {
305                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
306                 newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
307                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
308                     logger.debug("Active POD = " + pserver + ", Current state is " + stnames[newstate]);
309                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
310                 }
311             } else {
312                 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
313             }
314         } catch (UnknownHostException e) {
315             logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
316         }
317
318         if (newstate != state) {
319             logger
320                 .info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
321         }
322         return newstate;
323     }
324
325     private static long nextMsg = 0;    // only display the "Current state" msg every 5 mins.
326
327     /**
328      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
329      */
330     private void syncFeeds(JSONArray ja) {
331         Collection<Syncable> coll = new ArrayList<>();
332         for (int n = 0; n < ja.length(); n++) {
333             try {
334                 Feed f = new Feed(ja.getJSONObject(n));
335                 coll.add(f);
336             } catch (Exception e) {
337                 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
338             }
339         }
340         if (sync(coll, Feed.getAllFeeds())) {
341             BaseServlet.provisioningDataChanged();
342         }
343     }
344
345     /**
346      * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
347      */
348     private void syncSubs(JSONArray ja) {
349         Collection<Syncable> coll = new ArrayList<>();
350         for (int n = 0; n < ja.length(); n++) {
351             try {
352                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
353                 JSONObject j = ja.getJSONObject(n);
354                 j.put("sync", "true");
355                 Subscription s = new Subscription(j);
356                 coll.add(s);
357             } catch (Exception e) {
358                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
359             }
360         }
361         if (sync(coll, Subscription.getAllSubscriptions())) {
362             BaseServlet.provisioningDataChanged();
363         }
364     }
365
366     /**
367      * Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB.
368      */
369     private void syncGroups(JSONArray ja) {
370         Collection<Syncable> coll = new ArrayList<>();
371         for (int n = 0; n < ja.length(); n++) {
372             try {
373                 Group g = new Group(ja.getJSONObject(n));
374                 coll.add(g);
375             } catch (Exception e) {
376                 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
377             }
378         }
379         if (sync(coll, Group.getAllgroups())) {
380             BaseServlet.provisioningDataChanged();
381         }
382     }
383
384
385     /**
386      * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
387      */
388     private void syncParams(JSONObject jo) {
389         Collection<Syncable> coll = new ArrayList<>();
390         for (String k : jo.keySet()) {
391             String v = "";
392             try {
393                 v = jo.getString(k);
394             } catch (JSONException e) {
395                 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
396                 try {
397                     v = "" + jo.getInt(k);
398                 } catch (JSONException e1) {
399                     logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e);
400                     JSONArray ja = jo.getJSONArray(k);
401                     for (int i = 0; i < ja.length(); i++) {
402                         if (i > 0) {
403                             v += "|";
404                         }
405                         v += ja.getString(i);
406                     }
407                 }
408             }
409             coll.add(new Parameters(k, v));
410         }
411         if (sync(coll, Parameters.getParameterCollection())) {
412             BaseServlet.provisioningDataChanged();
413             BaseServlet.provisioningParametersChanged();
414         }
415     }
416
417     private void syncIngressRoutes(JSONArray ja) {
418         Collection<Syncable> coll = new ArrayList<>();
419         for (int n = 0; n < ja.length(); n++) {
420             try {
421                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
422                 coll.add(in);
423             } catch (NumberFormatException e) {
424                 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
425             }
426         }
427         if (sync(coll, IngressRoute.getAllIngressRoutes())) {
428             BaseServlet.provisioningDataChanged();
429         }
430     }
431
432     private void syncEgressRoutes(JSONObject jo) {
433         Collection<Syncable> coll = new ArrayList<>();
434         for (String key : jo.keySet()) {
435             try {
436                 int sub = Integer.parseInt(key);
437                 String node = jo.getString(key);
438                 EgressRoute er = new EgressRoute(sub, node);
439                 coll.add(er);
440             } catch (NumberFormatException e) {
441                 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
442             } catch (IllegalArgumentException e) {
443                 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
444             }
445         }
446         if (sync(coll, EgressRoute.getAllEgressRoutes())) {
447             BaseServlet.provisioningDataChanged();
448         }
449     }
450
451     private void syncNetworkRoutes(JSONArray ja) {
452         Collection<Syncable> coll = new ArrayList<>();
453         for (int n = 0; n < ja.length(); n++) {
454             try {
455                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
456                 coll.add(nr);
457             } catch (JSONException e) {
458                 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n));
459             }
460         }
461         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
462             BaseServlet.provisioningDataChanged();
463         }
464     }
465
466     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
467         boolean changes = false;
468         try {
469             Map<String, Syncable> newmap = getMap(newc);
470             Map<String, Syncable> oldmap = getMap(oldc);
471             Set<String> union = new TreeSet<>(newmap.keySet());
472             union.addAll(oldmap.keySet());
473             DB db = new DB();
474             @SuppressWarnings("resource")
475             Connection conn = db.getConnection();
476             for (String n : union) {
477                 Syncable newobj = newmap.get(n);
478                 Syncable oldobj = oldmap.get(n);
479                 if (oldobj == null) {
480                     if (logger.isDebugEnabled()) {
481                         logger.debug("  Inserting record: " + newobj);
482                     }
483                     newobj.doInsert(conn);
484                     changes = true;
485                 } else if (newobj == null) {
486                     if (logger.isDebugEnabled()) {
487                         logger.debug("  Deleting record: " + oldobj);
488                     }
489                     oldobj.doDelete(conn);
490                     changes = true;
491                 } else if (!newobj.equals(oldobj)) {
492                     if (logger.isDebugEnabled()) {
493                         logger.debug("  Updating record: " + newobj);
494                     }
495                     newobj.doUpdate(conn);
496
497                     /**Rally US708115
498                      * Change Ownership of FEED - 1610, Syncronised with secondary DB.
499                      * */
500                     checkChnageOwner(newobj, oldobj);
501
502                     changes = true;
503                 }
504             }
505             db.release(conn);
506         } catch (SQLException e) {
507             logger.warn("PROV5009: problem during sync, exception: " + e);
508         }
509         return changes;
510     }
511
512     private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
513         Map<String, Syncable> map = new HashMap<>();
514         for (Syncable v : c) {
515             map.put(v.getKey(), v);
516         }
517         return map;
518     }
519
520     /**Change owner of FEED/SUBSCRIPTION*/
521     /**
522      * Rally US708115 Change Ownership of FEED - 1610
523      */
524     private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
525         if (newobj instanceof Feed) {
526             Feed oldfeed = (Feed) oldobj;
527             Feed newfeed = (Feed) newobj;
528
529             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
530                 logger.info("PROV5013 -  Previous publisher: " + oldfeed.getPublisher() + ": New publisher-" + newfeed
531                     .getPublisher());
532                 oldfeed.setPublisher(newfeed.getPublisher());
533                 oldfeed.changeOwnerShip();
534             }
535         } else if (newobj instanceof Subscription) {
536             Subscription oldsub = (Subscription) oldobj;
537             Subscription newsub = (Subscription) newobj;
538
539             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
540                 logger.info("PROV5013 -  Previous subscriber: " + oldsub.getSubscriber() + ": New subscriber-" + newsub
541                     .getSubscriber());
542                 oldsub.setSubscriber(newsub.getSubscriber());
543                 oldsub.changeOwnerShip();
544             }
545         }
546
547     }
548
549     /**
550      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
551      *
552      * @return the provisioning data (as a JONObject)
553      */
554     private synchronized JSONObject readProvisioningJSON() {
555         String url = URLUtilities.generatePeerProvURL();
556         HttpGet get = new HttpGet(url);
557         try {
558             HttpResponse response = httpclient.execute(get);
559             int code = response.getStatusLine().getStatusCode();
560             if (code != HttpServletResponse.SC_OK) {
561                 logger.warn("PROV5010: readProvisioningJSON failed, bad error code: " + code);
562                 return null;
563             }
564             HttpEntity entity = response.getEntity();
565             String ctype = entity.getContentType().getValue().trim();
566             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype
567                 .equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
568                 logger.warn("PROV5011: readProvisioningJSON failed, bad content type: " + ctype);
569                 return null;
570             }
571             return new JSONObject(new JSONTokener(entity.getContent()));
572         } catch (Exception e) {
573             logger.warn("PROV5012: readProvisioningJSON failed, exception: " + e);
574             return null;
575         } finally {
576             get.releaseConnection();
577         }
578     }
579
580     /**
581      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
582      * the remote database.
583      *
584      * @return the bitset
585      */
586     private RLEBitSet readRemoteLoglist() {
587         RLEBitSet bs = new RLEBitSet();
588         String url = URLUtilities.generatePeerLogsURL();
589
590         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
591         if ("".equals(url)) {
592             return bs;
593         }
594         //End of fix.
595
596         HttpGet get = new HttpGet(url);
597         try {
598             HttpResponse response = httpclient.execute(get);
599             int code = response.getStatusLine().getStatusCode();
600             if (code != HttpServletResponse.SC_OK) {
601                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
602                 return bs;
603             }
604             HttpEntity entity = response.getEntity();
605             String ctype = entity.getContentType().getValue().trim();
606             if (!"text/plain".equals(ctype)) {
607                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
608                 return bs;
609             }
610             InputStream is = entity.getContent();
611             ByteArrayOutputStream bos = new ByteArrayOutputStream();
612             int ch = 0;
613             while ((ch = is.read()) >= 0) {
614                 bos.write(ch);
615             }
616             bs.set(bos.toString());
617             is.close();
618         } catch (Exception e) {
619             logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
620             return bs;
621         } finally {
622             get.releaseConnection();
623         }
624         return bs;
625     }
626
627     /**
628      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
629      * we wish to copy to the local database.
630      *
631      * @param bs the bitset (an RELBitSet) of log records to fetch
632      */
633     private void replicateDRLogs(RLEBitSet bs) {
634         String url = URLUtilities.generatePeerLogsURL();
635         HttpPost post = new HttpPost(url);
636         try {
637             String t = bs.toString();
638             HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
639             post.setEntity(body);
640             if (logger.isDebugEnabled()) {
641                 logger.debug("Requesting records: " + t);
642             }
643
644             HttpResponse response = httpclient.execute(post);
645             int code = response.getStatusLine().getStatusCode();
646             if (code != HttpServletResponse.SC_OK) {
647                 logger.warn("PROV5010: replicateDRLogs failed, bad error code: " + code);
648                 return;
649             }
650             HttpEntity entity = response.getEntity();
651             String ctype = entity.getContentType().getValue().trim();
652             if (!"text/plain".equals(ctype)) {
653                 logger.warn("PROV5011: replicateDRLogs failed, bad content type: " + ctype);
654                 return;
655             }
656
657             String spoolname = "" + System.currentTimeMillis();
658             Path tmppath = Paths.get(spooldir, spoolname);
659             Path donepath = Paths.get(spooldir, "IN." + spoolname);
660             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
661             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
662             logger.info("Approximately " + bs.cardinality() + " records replicated.");
663         } catch (Exception e) {
664             logger.warn("PROV5012: replicateDRLogs failed, exception: " + e);
665         } finally {
666             post.releaseConnection();
667         }
668     }
669 }