Checkstyle fixes for datarouter prov
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning;
26
27 import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
28
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31
32 import java.io.ByteArrayOutputStream;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.InputStream;
36 import java.net.InetAddress;
37 import java.net.UnknownHostException;
38 import java.nio.file.Files;
39 import java.nio.file.Path;
40 import java.nio.file.Paths;
41 import java.nio.file.StandardCopyOption;
42 import java.security.KeyStore;
43 import java.sql.Connection;
44 import java.sql.SQLException;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.Collection;
48 import java.util.HashMap;
49 import java.util.Map;
50 import java.util.Properties;
51 import java.util.Set;
52 import java.util.Timer;
53 import java.util.TimerTask;
54 import java.util.TreeSet;
55
56 import javax.servlet.http.HttpServletResponse;
57
58 import org.apache.http.HttpEntity;
59 import org.apache.http.HttpResponse;
60 import org.apache.http.client.methods.HttpGet;
61 import org.apache.http.client.methods.HttpPost;
62 import org.apache.http.conn.scheme.Scheme;
63 import org.apache.http.conn.ssl.SSLSocketFactory;
64 import org.apache.http.entity.ByteArrayEntity;
65 import org.apache.http.entity.ContentType;
66 import org.apache.http.impl.client.AbstractHttpClient;
67 import org.apache.http.impl.client.DefaultHttpClient;
68 import org.json.JSONArray;
69 import org.json.JSONException;
70 import org.json.JSONObject;
71 import org.json.JSONTokener;
72 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
73 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
74 import org.onap.dmaap.datarouter.provisioning.beans.Group;
75 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
76 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
77 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
78 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
79 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
80 import org.onap.dmaap.datarouter.provisioning.utils.DB;
81 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
82 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
83 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
84
85 /**
86  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
87  * <ol>
88  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
89  * the active (master) POD.</li>
90  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
91  * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
92  * of this POD.</li>
93  * </ol>
94  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
95  * <code>
96  * Security.setProperty("networkaddress.cache.ttl", "10");
97  * </code>
98  *
99  * @author Robert Eby
100  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
101  */
102
103 public class SynchronizerTask extends TimerTask {
104
105     /**
106      * This is a singleton -- there is only one SynchronizerTask object in the server.
107      */
108     private static SynchronizerTask synctask;
109
110     /**
111      * This POD is unknown -- not on the list of PODs.
112      */
113     public static final int UNKNOWN_POD = 0;
114     /**
115      * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
116      */
117     public static final int ACTIVE_POD = 1;
118     /**
119      * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
120      */
121     public static final int STANDBY_POD = 2;
122
123     private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
124     private static final long ONE_HOUR = 60 * 60 * 1000L;
125
126     private long nextMsg = 0;    // only display the "Current podState" msg every 5 mins.
127
128     private final EELFLogger logger;
129     private final Timer rolex;
130     private final String spooldir;
131     private int podState;
132     private boolean doFetch;
133     private long nextsynctime;
134     private AbstractHttpClient httpclient = null;
135
136     @SuppressWarnings("deprecation")
137     private SynchronizerTask() {
138         logger = EELFManager.getInstance().getLogger("InternalLog");
139         rolex = new Timer();
140         spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
141         podState = UNKNOWN_POD;
142         doFetch = true;        // start off with a fetch
143         nextsynctime = 0;
144
145         logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
146         try {
147             Properties props = (new DB()).getProperties();
148             String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
149             String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
150             String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
151             KeyStore keyStore = KeyStore.getInstance(type);
152             try (FileInputStream instream = new FileInputStream(new File(store))) {
153                 keyStore.load(instream, pass.toCharArray());
154
155             }
156             store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
157             pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
158             KeyStore trustStore = null;
159             if (store != null && store.length() > 0) {
160                 trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
161                 try (FileInputStream instream = new FileInputStream(new File(store))) {
162                     trustStore.load(instream, pass.toCharArray());
163
164                 }
165             }
166
167             // We are connecting with the node name, but the certificate will have the CNAME
168             // So we need to accept a non-matching certificate name
169             String keystorepass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
170             try (AbstractHttpClient hc = new DefaultHttpClient()) {
171                 SSLSocketFactory socketFactory =
172                         (trustStore == null)
173                                 ? new SSLSocketFactory(keyStore, keystorepass)
174                                 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
175                 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
176                 Scheme sch = new Scheme("https", 443, socketFactory);
177                 hc.getConnectionManager().getSchemeRegistry().register(sch);
178                 httpclient = hc;
179             }
180             setSynchTimer(props);
181         } catch (Exception e) {
182             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
183         }
184     }
185
186     private void setSynchTimer(Properties props) {
187         // Run once every 5 seconds to check DNS, etc.
188         long interval;
189         try {
190             String str = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
191             interval = Long.parseLong(str);
192         } catch (NumberFormatException e) {
193             interval = 5000L;
194         }
195         rolex.scheduleAtFixedRate(this, 0L, interval);
196     }
197
198     /**
199      * Get the singleton SynchronizerTask object.
200      *
201      * @return the SynchronizerTask
202      */
203     public static synchronized SynchronizerTask getSynchronizer() {
204         if (synctask == null) {
205             synctask = new SynchronizerTask();
206         }
207         return synctask;
208     }
209
210     /**
211      * What is the podState of this POD?.
212      *
213      * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
214      */
215     public int getPodState() {
216         return podState;
217     }
218
219     /**
220      * Is this the active POD?.
221      *
222      * @return true if we are active (the master), false otherwise
223      */
224     public boolean isActive() {
225         return podState == ACTIVE_POD;
226     }
227
228     /**
229      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
230      * should re-synchronize with the master.
231      */
232     void doFetch() {
233         doFetch = true;
234     }
235
236     /**
237      * Runs once a minute in order to <ol>
238      * <li>lookup DNS names,</li>
239      * <li>determine the podState of this POD,</li>
240      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
241      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
242      * </ol>.
243      */
244     @Override
245     public void run() {
246         try {
247             podState = lookupState();
248             if (podState == STANDBY_POD) {
249                 // Only copy provisioning data FROM the active server TO the standby
250                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
251                     syncProvisioningData();
252                     logger.info("PROV5013: Sync completed.");
253                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
254                 }
255             } else {
256                 // Don't do fetches on non-standby PODs
257                 doFetch = false;
258             }
259
260             // Fetch DR logs as needed - server to server
261             LogfileLoader lfl = LogfileLoader.getLoader();
262             if (lfl.isIdle()) {
263                 // Only fetch new logs if the loader is waiting for them.
264                 logger.trace("Checking for logs to replicate...");
265                 RLEBitSet local = lfl.getBitSet();
266                 RLEBitSet remote = readRemoteLoglist();
267                 remote.andNot(local);
268                 if (!remote.isEmpty()) {
269                     logger.debug(" Replicating logs: " + remote);
270                     replicateDataRouterLogs(remote);
271                 }
272             }
273         } catch (Exception e) {
274             logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
275         }
276     }
277
278     private void syncProvisioningData() {
279         logger.debug("Initiating a sync...");
280         JSONObject jo = readProvisioningJson();
281         if (jo != null) {
282             doFetch = false;
283             syncFeeds(jo.getJSONArray("feeds"));
284             syncSubs(jo.getJSONArray("subscriptions"));
285             syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
286             syncParams(jo.getJSONObject("parameters"));
287             // The following will not be present in a version=1.0 provfeed
288             JSONArray ja = jo.optJSONArray("ingress");
289             if (ja != null) {
290                 syncIngressRoutes(ja);
291             }
292             JSONObject j2 = jo.optJSONObject("egress");
293             if (j2 != null) {
294                 syncEgressRoutes(j2);
295             }
296             ja = jo.optJSONArray("routing");
297             if (ja != null) {
298                 syncNetworkRoutes(ja);
299             }
300         }
301     }
302
303     /**
304      * This method is used to lookup the CNAME that points to the active server.
305      * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
306      *
307      * @return the current podState
308      */
309     int lookupState() {
310         int newPodState = UNKNOWN_POD;
311         try {
312             InetAddress myaddr = InetAddress.getLocalHost();
313             if (logger.isTraceEnabled()) {
314                 logger.trace("My address: " + myaddr);
315             }
316             String thisPod = myaddr.getHostName();
317             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
318             if (pods.contains(thisPod)) {
319                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
320                 newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
321                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
322                     logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
323                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
324                 }
325             } else {
326                 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
327             }
328         } catch (UnknownHostException e) {
329             logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
330         }
331
332         if (newPodState != podState) {
333             logger.info(String.format("PROV5001: Server podState changed from %s to %s",
334                     stnames[podState], stnames[newPodState]));
335         }
336         return newPodState;
337     }
338
339     /**
340      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
341      */
342     private void syncFeeds(JSONArray ja) {
343         Collection<Syncable> coll = new ArrayList<>();
344         for (int n = 0; n < ja.length(); n++) {
345             try {
346                 Feed feed = new Feed(ja.getJSONObject(n));
347                 coll.add(feed);
348             } catch (Exception e) {
349                 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
350             }
351         }
352         if (sync(coll, Feed.getAllFeeds())) {
353             BaseServlet.provisioningDataChanged();
354         }
355     }
356
357     /**
358      * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
359      */
360     private void syncSubs(JSONArray ja) {
361         Collection<Syncable> coll = new ArrayList<>();
362         for (int n = 0; n < ja.length(); n++) {
363             try {
364                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
365                 JSONObject jsonObject = ja.getJSONObject(n);
366                 jsonObject.put("sync", "true");
367                 Subscription sub = new Subscription(jsonObject);
368                 coll.add(sub);
369             } catch (Exception e) {
370                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
371             }
372         }
373         if (sync(coll, Subscription.getAllSubscriptions())) {
374             BaseServlet.provisioningDataChanged();
375         }
376     }
377
378     /**
379      * Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB.
380      */
381     private void syncGroups(JSONArray ja) {
382         Collection<Syncable> coll = new ArrayList<>();
383         for (int n = 0; n < ja.length(); n++) {
384             try {
385                 Group group = new Group(ja.getJSONObject(n));
386                 coll.add(group);
387             } catch (Exception e) {
388                 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
389             }
390         }
391         if (sync(coll, Group.getAllgroups())) {
392             BaseServlet.provisioningDataChanged();
393         }
394     }
395
396
397     /**
398      * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
399      */
400     private void syncParams(JSONObject jo) {
401         Collection<Syncable> coll = new ArrayList<>();
402         for (String k : jo.keySet()) {
403             String val = "";
404             try {
405                 val = jo.getString(k);
406             } catch (JSONException e) {
407                 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
408                 try {
409                     val = "" + jo.getInt(k);
410                 } catch (JSONException e1) {
411                     logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
412                     JSONArray ja = jo.getJSONArray(k);
413                     for (int i = 0; i < ja.length(); i++) {
414                         if (i > 0) {
415                             val += "|";
416                         }
417                         val += ja.getString(i);
418                     }
419                 }
420             }
421             coll.add(new Parameters(k, val));
422         }
423         if (sync(coll, Parameters.getParameterCollection())) {
424             BaseServlet.provisioningDataChanged();
425             BaseServlet.provisioningParametersChanged();
426         }
427     }
428
429     private void syncIngressRoutes(JSONArray ja) {
430         Collection<Syncable> coll = new ArrayList<>();
431         for (int n = 0; n < ja.length(); n++) {
432             try {
433                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
434                 coll.add(in);
435             } catch (NumberFormatException e) {
436                 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
437             }
438         }
439         if (sync(coll, IngressRoute.getAllIngressRoutes())) {
440             BaseServlet.provisioningDataChanged();
441         }
442     }
443
444     private void syncEgressRoutes(JSONObject jo) {
445         Collection<Syncable> coll = new ArrayList<>();
446         for (String key : jo.keySet()) {
447             try {
448                 int sub = Integer.parseInt(key);
449                 String node = jo.getString(key);
450                 EgressRoute er = new EgressRoute(sub, node);
451                 coll.add(er);
452             } catch (NumberFormatException e) {
453                 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
454             } catch (IllegalArgumentException e) {
455                 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
456             }
457         }
458         if (sync(coll, EgressRoute.getAllEgressRoutes())) {
459             BaseServlet.provisioningDataChanged();
460         }
461     }
462
463     private void syncNetworkRoutes(JSONArray ja) {
464         Collection<Syncable> coll = new ArrayList<>();
465         for (int n = 0; n < ja.length(); n++) {
466             try {
467                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
468                 coll.add(nr);
469             } catch (JSONException e) {
470                 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
471             }
472         }
473         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
474             BaseServlet.provisioningDataChanged();
475         }
476     }
477
478     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
479         boolean changes = false;
480         try {
481             Map<String, Syncable> newmap = getMap(newc);
482             Map<String, Syncable> oldmap = getMap(oldc);
483             Set<String> union = new TreeSet<>(newmap.keySet());
484             union.addAll(oldmap.keySet());
485             DB db = new DB();
486             @SuppressWarnings("resource")
487             Connection conn = db.getConnection();
488             for (String n : union) {
489                 Syncable newobj = newmap.get(n);
490                 Syncable oldobj = oldmap.get(n);
491                 if (oldobj == null) {
492                     changes = insertRecord(conn, newobj);
493                 } else if (newobj == null) {
494                     changes = deleteRecord(conn, oldobj);
495                 } else if (!newobj.equals(oldobj)) {
496                     changes = updateRecord(conn, newobj, oldobj);
497                 }
498             }
499             db.release(conn);
500         } catch (SQLException e) {
501             logger.warn("PROV5009: problem during sync, exception: " + e);
502         }
503         return changes;
504     }
505
506     private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
507         if (logger.isDebugEnabled()) {
508             logger.debug("  Updating record: " + newobj);
509         }
510         boolean changes = newobj.doUpdate(conn);
511         checkChangeOwner(newobj, oldobj);
512
513         return changes;
514     }
515
516     private boolean deleteRecord(Connection conn, Syncable oldobj) {
517         if (logger.isDebugEnabled()) {
518             logger.debug("  Deleting record: " + oldobj);
519         }
520         return oldobj.doDelete(conn);
521     }
522
523     private boolean insertRecord(Connection conn, Syncable newobj) {
524         if (logger.isDebugEnabled()) {
525             logger.debug("  Inserting record: " + newobj);
526         }
527         return newobj.doInsert(conn);
528     }
529
530     private Map<String, Syncable> getMap(Collection<? extends Syncable> coll) {
531         Map<String, Syncable> map = new HashMap<>();
532         for (Syncable v : coll) {
533             map.put(v.getKey(), v);
534         }
535         return map;
536     }
537
538     /**
539      * Change owner of FEED/SUBSCRIPTION.
540      * Rally US708115 Change Ownership of FEED - 1610
541      */
542     private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
543         if (newobj instanceof Feed) {
544             Feed oldfeed = (Feed) oldobj;
545             Feed newfeed = (Feed) newobj;
546
547             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
548                 logger.info("PROV5013 -  Previous publisher: "
549                                     + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
550                 oldfeed.setPublisher(newfeed.getPublisher());
551                 oldfeed.changeOwnerShip();
552             }
553         } else if (newobj instanceof Subscription) {
554             Subscription oldsub = (Subscription) oldobj;
555             Subscription newsub = (Subscription) newobj;
556
557             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
558                 logger.info("PROV5013 -  Previous subscriber: "
559                                     + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
560                 oldsub.setSubscriber(newsub.getSubscriber());
561                 oldsub.changeOwnerShip();
562             }
563         }
564
565     }
566
567     /**
568      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
569      *
570      * @return the provisioning data (as a JONObject)
571      */
572     private synchronized JSONObject readProvisioningJson() {
573         String url = URLUtilities.generatePeerProvURL();
574         HttpGet get = new HttpGet(url);
575         try {
576             HttpResponse response = httpclient.execute(get);
577             int code = response.getStatusLine().getStatusCode();
578             if (code != HttpServletResponse.SC_OK) {
579                 logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
580                 return null;
581             }
582             HttpEntity entity = response.getEntity();
583             String ctype = entity.getContentType().getValue().trim();
584             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
585                         && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
586                 logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
587                 return null;
588             }
589             return new JSONObject(new JSONTokener(entity.getContent()));
590         } catch (Exception e) {
591             logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
592             return null;
593         } finally {
594             get.releaseConnection();
595         }
596     }
597
598     /**
599      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
600      * the remote database.
601      *
602      * @return the bitset
603      */
604     RLEBitSet readRemoteLoglist() {
605         RLEBitSet bs = new RLEBitSet();
606         String url = URLUtilities.generatePeerLogsURL();
607
608         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
609         if ("".equals(url)) {
610             return bs;
611         }
612         //End of fix.
613
614         HttpGet get = new HttpGet(url);
615         try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
616             HttpResponse response = httpclient.execute(get);
617             int code = response.getStatusLine().getStatusCode();
618             if (code != HttpServletResponse.SC_OK) {
619                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
620                 return bs;
621             }
622             HttpEntity entity = response.getEntity();
623             String ctype = entity.getContentType().getValue().trim();
624             if (!TEXT_CT.equals(ctype)) {
625                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
626                 return bs;
627             }
628             InputStream is = entity.getContent();
629             int ch;
630             while ((ch = is.read()) >= 0) {
631                 bos.write(ch);
632             }
633             bs.set(bos.toString());
634             is.close();
635         } catch (Exception e) {
636             logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
637             return bs;
638         } finally {
639             get.releaseConnection();
640         }
641         return bs;
642     }
643
644     /**
645      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
646      * we wish to copy to the local database.
647      *
648      * @param bs the bitset (an RELBitSet) of log records to fetch
649      */
650     void replicateDataRouterLogs(RLEBitSet bs) {
651         String url = URLUtilities.generatePeerLogsURL();
652         HttpPost post = new HttpPost(url);
653         try {
654             String str = bs.toString();
655             HttpEntity body = new ByteArrayEntity(str.getBytes(), ContentType.create(TEXT_CT));
656             post.setEntity(body);
657             if (logger.isDebugEnabled()) {
658                 logger.debug("Requesting records: " + str);
659             }
660
661             HttpResponse response = httpclient.execute(post);
662             int code = response.getStatusLine().getStatusCode();
663             if (code != HttpServletResponse.SC_OK) {
664                 logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
665                 return;
666             }
667             HttpEntity entity = response.getEntity();
668             String ctype = entity.getContentType().getValue().trim();
669             if (!TEXT_CT.equals(ctype)) {
670                 logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
671                 return;
672             }
673
674             String spoolname = "" + System.currentTimeMillis();
675             Path tmppath = Paths.get(spooldir, spoolname);
676             Path donepath = Paths.get(spooldir, "IN." + spoolname);
677             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
678             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
679             logger.info("Approximately " + bs.cardinality() + " records replicated.");
680         } catch (Exception e) {
681             logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
682         } finally {
683             post.releaseConnection();
684         }
685     }
686 }