Refactor Prov DB handling
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning.utils;
26
27 import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
28
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31 import java.io.ByteArrayOutputStream;
32 import java.io.File;
33 import java.io.FileInputStream;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.nio.file.Files;
38 import java.nio.file.Path;
39 import java.nio.file.Paths;
40 import java.nio.file.StandardCopyOption;
41 import java.security.KeyStore;
42 import java.sql.Connection;
43 import java.sql.SQLException;
44 import java.util.ArrayList;
45 import java.util.Arrays;
46 import java.util.Collection;
47 import java.util.HashMap;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.Timer;
51 import java.util.TimerTask;
52 import java.util.TreeSet;
53 import javax.servlet.http.HttpServletResponse;
54 import org.apache.http.HttpEntity;
55 import org.apache.http.HttpResponse;
56 import org.apache.http.client.methods.HttpGet;
57 import org.apache.http.client.methods.HttpPost;
58 import org.apache.http.conn.scheme.Scheme;
59 import org.apache.http.conn.ssl.SSLSocketFactory;
60 import org.apache.http.entity.ByteArrayEntity;
61 import org.apache.http.entity.ContentType;
62 import org.apache.http.impl.client.AbstractHttpClient;
63 import org.apache.http.impl.client.DefaultHttpClient;
64 import org.json.JSONArray;
65 import org.json.JSONException;
66 import org.json.JSONObject;
67 import org.json.JSONTokener;
68 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
69 import org.onap.dmaap.datarouter.provisioning.ProvRunner;
70 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
71 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
72 import org.onap.dmaap.datarouter.provisioning.beans.Group;
73 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
74 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
75 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
76 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
77 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
78
79 /**
80  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
81  * <ol>
82  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
83  * the active (master) POD.</li>
84  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
85  * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
86  * of this POD.</li>
87  * </ol>
88  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
89  * <code>
90  * Security.setProperty("networkaddress.cache.ttl", "10");
91  * </code>
92  *
93  * @author Robert Eby
94  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
95  */
96
97 public class SynchronizerTask extends TimerTask {
98
99     /**
100      * This is a singleton -- there is only one SynchronizerTask object in the server.
101      */
102     private static SynchronizerTask synctask;
103
104     /**
105      * This POD is unknown -- not on the list of PODs.
106      */
107     public static final int UNKNOWN_POD = 0;
108     /**
109      * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
110      */
111     public static final int ACTIVE_POD = 1;
112     /**
113      * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
114      */
115     public static final int STANDBY_POD = 2;
116
117     private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
118     private static final long ONE_HOUR = 60 * 60 * 1000L;
119
120     private long nextMsg = 0;    // only display the "Current podState" msg every 5 mins.
121
122     private final EELFLogger logger;
123     private final Timer rolex;
124     private final String spooldir;
125     private int podState;
126     private boolean doFetch;
127     private long nextsynctime;
128     private AbstractHttpClient httpclient = null;
129
130     @SuppressWarnings("deprecation")
131     private SynchronizerTask() {
132         logger = EELFManager.getInstance().getLogger("InternalLog");
133         rolex = new Timer();
134         spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
135         podState = UNKNOWN_POD;
136         doFetch = true;        // start off with a fetch
137         nextsynctime = 0;
138
139         logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
140         try {
141             // Set up keystore
142             String type = AafPropsUtils.KEYSTORE_TYPE_PROPERTY;
143             String store = ProvRunner.getAafPropsUtils().getKeystorePathProperty();
144             String pass = ProvRunner.getAafPropsUtils().getKeystorePassProperty();
145             KeyStore keyStore = KeyStore.getInstance(type);
146             try (FileInputStream instream = new FileInputStream(new File(store))) {
147                 keyStore.load(instream, pass.toCharArray());
148
149             }
150             // Set up truststore
151             store = ProvRunner.getAafPropsUtils().getTruststorePathProperty();
152             pass = ProvRunner.getAafPropsUtils().getTruststorePassProperty();
153             KeyStore trustStore = null;
154             if (store != null && store.length() > 0) {
155                 trustStore = KeyStore.getInstance(AafPropsUtils.TRUESTSTORE_TYPE_PROPERTY);
156                 try (FileInputStream instream = new FileInputStream(new File(store))) {
157                     trustStore.load(instream, pass.toCharArray());
158
159                 }
160             }
161
162             // We are connecting with the node name, but the certificate will have the CNAME
163             // So we need to accept a non-matching certificate name
164             String keystorepass = ProvRunner.getAafPropsUtils().getKeystorePassProperty();
165             try (AbstractHttpClient hc = new DefaultHttpClient()) {
166                 SSLSocketFactory socketFactory =
167                         (trustStore == null)
168                                 ? new SSLSocketFactory(keyStore, keystorepass)
169                                 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
170                 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
171                 Scheme sch = new Scheme("https", 443, socketFactory);
172                 hc.getConnectionManager().getSchemeRegistry().register(sch);
173                 httpclient = hc;
174             }
175             setSynchTimer(ProvRunner.getProvProperties().getProperty(
176                 "org.onap.dmaap.datarouter.provserver.sync_interval", "5000"));
177         } catch (Exception e) {
178             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
179         }
180     }
181
182     private void setSynchTimer(String strInterval) {
183         // Run once every 5 seconds to check DNS, etc.
184         long interval;
185         try {
186             interval = Long.parseLong(strInterval);
187         } catch (NumberFormatException e) {
188             interval = 5000L;
189         }
190         rolex.scheduleAtFixedRate(this, 0L, interval);
191     }
192
193     /**
194      * Get the singleton SynchronizerTask object.
195      *
196      * @return the SynchronizerTask
197      */
198     public static synchronized SynchronizerTask getSynchronizer() {
199         if (synctask == null) {
200             synctask = new SynchronizerTask();
201         }
202         return synctask;
203     }
204
205     /**
206      * What is the podState of this POD?.
207      *
208      * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
209      */
210     public int getPodState() {
211         return podState;
212     }
213
214     /**
215      * Is this the active POD?.
216      *
217      * @return true if we are active (the master), false otherwise
218      */
219     public boolean isActive() {
220         return podState == ACTIVE_POD;
221     }
222
223     /**
224      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
225      * should re-synchronize with the master.
226      */
227     public void doFetch() {
228         doFetch = true;
229     }
230
231     /**
232      * Runs once a minute in order to <ol>
233      * <li>lookup DNS names,</li>
234      * <li>determine the podState of this POD,</li>
235      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
236      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
237      * </ol>.
238      */
239     @Override
240     public void run() {
241         try {
242             podState = lookupState();
243             if (podState == STANDBY_POD) {
244                 // Only copy provisioning data FROM the active server TO the standby
245                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
246                     syncProvisioningData();
247                     logger.info("PROV5013: Sync completed.");
248                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
249                 }
250             } else {
251                 // Don't do fetches on non-standby PODs
252                 doFetch = false;
253             }
254
255             // Fetch DR logs as needed - server to server
256             LogfileLoader lfl = LogfileLoader.getLoader();
257             if (lfl.isIdle()) {
258                 // Only fetch new logs if the loader is waiting for them.
259                 logger.trace("Checking for logs to replicate...");
260                 RLEBitSet local = lfl.getBitSet();
261                 RLEBitSet remote = readRemoteLoglist();
262                 remote.andNot(local);
263                 if (!remote.isEmpty()) {
264                     logger.debug(" Replicating logs: " + remote);
265                     replicateDataRouterLogs(remote);
266                 }
267             }
268         } catch (Exception e) {
269             logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
270         }
271     }
272
273     private void syncProvisioningData() {
274         logger.debug("Initiating a sync...");
275         JSONObject jo = readProvisioningJson();
276         if (jo != null) {
277             doFetch = false;
278             syncFeeds(jo.getJSONArray("feeds"));
279             syncSubs(jo.getJSONArray("subscriptions"));
280             syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
281             syncParams(jo.getJSONObject("parameters"));
282             // The following will not be present in a version=1.0 provfeed
283             JSONArray ja = jo.optJSONArray("ingress");
284             if (ja != null) {
285                 syncIngressRoutes(ja);
286             }
287             JSONObject j2 = jo.optJSONObject("egress");
288             if (j2 != null) {
289                 syncEgressRoutes(j2);
290             }
291             ja = jo.optJSONArray("routing");
292             if (ja != null) {
293                 syncNetworkRoutes(ja);
294             }
295         }
296     }
297
298     /**
299      * This method is used to lookup the CNAME that points to the active server.
300      * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
301      *
302      * @return the current podState
303      */
304     public int lookupState() {
305         int newPodState = UNKNOWN_POD;
306         try {
307             InetAddress myaddr = InetAddress.getLocalHost();
308             if (logger.isTraceEnabled()) {
309                 logger.trace("My address: " + myaddr);
310             }
311             String thisPod = myaddr.getHostName();
312             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
313             if (pods.contains(thisPod)) {
314                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
315                 newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
316                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
317                     logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
318                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
319                 }
320             } else {
321                 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
322             }
323         } catch (UnknownHostException e) {
324             logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
325         }
326
327         if (newPodState != podState) {
328             logger.info(String.format("PROV5001: Server podState changed from %s to %s",
329                     stnames[podState], stnames[newPodState]));
330         }
331         return newPodState;
332     }
333
334     /**
335      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
336      */
337     private void syncFeeds(JSONArray ja) {
338         Collection<Syncable> coll = new ArrayList<>();
339         for (int n = 0; n < ja.length(); n++) {
340             try {
341                 Feed feed = new Feed(ja.getJSONObject(n));
342                 coll.add(feed);
343             } catch (Exception e) {
344                 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
345             }
346         }
347         if (sync(coll, Feed.getAllFeeds())) {
348             BaseServlet.provisioningDataChanged();
349         }
350     }
351
352     /**
353      * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
354      */
355     private void syncSubs(JSONArray ja) {
356         Collection<Syncable> coll = new ArrayList<>();
357         for (int n = 0; n < ja.length(); n++) {
358             try {
359                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
360                 JSONObject jsonObject = ja.getJSONObject(n);
361                 jsonObject.put("sync", "true");
362                 Subscription sub = new Subscription(jsonObject);
363                 coll.add(sub);
364             } catch (Exception e) {
365                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
366             }
367         }
368         if (sync(coll, Subscription.getAllSubscriptions())) {
369             BaseServlet.provisioningDataChanged();
370         }
371     }
372
373     /**
374      * Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB.
375      */
376     private void syncGroups(JSONArray ja) {
377         Collection<Syncable> coll = new ArrayList<>();
378         for (int n = 0; n < ja.length(); n++) {
379             try {
380                 Group group = new Group(ja.getJSONObject(n));
381                 coll.add(group);
382             } catch (Exception e) {
383                 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
384             }
385         }
386         if (sync(coll, Group.getAllgroups())) {
387             BaseServlet.provisioningDataChanged();
388         }
389     }
390
391
392     /**
393      * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
394      */
395     private void syncParams(JSONObject jo) {
396         Collection<Syncable> coll = new ArrayList<>();
397         for (String k : jo.keySet()) {
398             String val = "";
399             try {
400                 val = jo.getString(k);
401             } catch (JSONException e) {
402                 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
403                 try {
404                     val = "" + jo.getInt(k);
405                 } catch (JSONException e1) {
406                     logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
407                     JSONArray ja = jo.getJSONArray(k);
408                     for (int i = 0; i < ja.length(); i++) {
409                         if (i > 0) {
410                             val += "|";
411                         }
412                         val += ja.getString(i);
413                     }
414                 }
415             }
416             coll.add(new Parameters(k, val));
417         }
418         if (sync(coll, Parameters.getParameterCollection())) {
419             BaseServlet.provisioningDataChanged();
420             BaseServlet.provisioningParametersChanged();
421         }
422     }
423
424     private void syncIngressRoutes(JSONArray ja) {
425         Collection<Syncable> coll = new ArrayList<>();
426         for (int n = 0; n < ja.length(); n++) {
427             try {
428                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
429                 coll.add(in);
430             } catch (NumberFormatException e) {
431                 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
432             }
433         }
434         if (sync(coll, IngressRoute.getAllIngressRoutes())) {
435             BaseServlet.provisioningDataChanged();
436         }
437     }
438
439     private void syncEgressRoutes(JSONObject jo) {
440         Collection<Syncable> coll = new ArrayList<>();
441         for (String key : jo.keySet()) {
442             try {
443                 int sub = Integer.parseInt(key);
444                 String node = jo.getString(key);
445                 EgressRoute er = new EgressRoute(sub, node);
446                 coll.add(er);
447             } catch (NumberFormatException e) {
448                 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
449             } catch (IllegalArgumentException e) {
450                 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
451             }
452         }
453         if (sync(coll, EgressRoute.getAllEgressRoutes())) {
454             BaseServlet.provisioningDataChanged();
455         }
456     }
457
458     private void syncNetworkRoutes(JSONArray ja) {
459         Collection<Syncable> coll = new ArrayList<>();
460         for (int n = 0; n < ja.length(); n++) {
461             try {
462                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
463                 coll.add(nr);
464             } catch (JSONException e) {
465                 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
466             }
467         }
468         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
469             BaseServlet.provisioningDataChanged();
470         }
471     }
472
473     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
474         boolean changes = false;
475         try {
476             Map<String, Syncable> newmap = getMap(newc);
477             Map<String, Syncable> oldmap = getMap(oldc);
478             Set<String> union = new TreeSet<>(newmap.keySet());
479             union.addAll(oldmap.keySet());
480             for (String n : union) {
481                 Syncable newobj = newmap.get(n);
482                 Syncable oldobj = oldmap.get(n);
483                 if (oldobj == null) {
484                     try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
485                         changes = insertRecord(conn, newobj);
486                     }
487                 } else if (newobj == null) {
488                     try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
489                         changes = deleteRecord(conn, oldobj);
490                     }
491                 } else if (!newobj.equals(oldobj)) {
492                     try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
493                         changes = updateRecord(conn, newobj, oldobj);
494                     }
495                 }
496             }
497         } catch (SQLException e) {
498             logger.warn("PROV5009: problem during sync, exception: " + e);
499         }
500         return changes;
501     }
502
503     private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
504         if (logger.isDebugEnabled()) {
505             logger.debug("  Updating record: " + newobj);
506         }
507         boolean changes = newobj.doUpdate(conn);
508         checkChangeOwner(newobj, oldobj);
509
510         return changes;
511     }
512
513     private boolean deleteRecord(Connection conn, Syncable oldobj) {
514         if (logger.isDebugEnabled()) {
515             logger.debug("  Deleting record: " + oldobj);
516         }
517         return oldobj.doDelete(conn);
518     }
519
520     private boolean insertRecord(Connection conn, Syncable newobj) {
521         if (logger.isDebugEnabled()) {
522             logger.debug("  Inserting record: " + newobj);
523         }
524         return newobj.doInsert(conn);
525     }
526
527     private Map<String, Syncable> getMap(Collection<? extends Syncable> coll) {
528         Map<String, Syncable> map = new HashMap<>();
529         for (Syncable v : coll) {
530             map.put(v.getKey(), v);
531         }
532         return map;
533     }
534
535     /**
536      * Change owner of FEED/SUBSCRIPTION.
537      * Rally US708115 Change Ownership of FEED - 1610
538      */
539     private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
540         if (newobj instanceof Feed) {
541             Feed oldfeed = (Feed) oldobj;
542             Feed newfeed = (Feed) newobj;
543
544             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
545                 logger.info("PROV5013 -  Previous publisher: "
546                                     + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
547                 oldfeed.setPublisher(newfeed.getPublisher());
548                 oldfeed.changeOwnerShip();
549             }
550         } else if (newobj instanceof Subscription) {
551             Subscription oldsub = (Subscription) oldobj;
552             Subscription newsub = (Subscription) newobj;
553
554             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
555                 logger.info("PROV5013 -  Previous subscriber: "
556                                     + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
557                 oldsub.setSubscriber(newsub.getSubscriber());
558                 oldsub.changeOwnerShip();
559             }
560         }
561
562     }
563
564     /**
565      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
566      *
567      * @return the provisioning data (as a JONObject)
568      */
569     private synchronized JSONObject readProvisioningJson() {
570         String url = URLUtilities.generatePeerProvURL();
571         HttpGet get = new HttpGet(url);
572         try {
573             HttpResponse response = httpclient.execute(get);
574             int code = response.getStatusLine().getStatusCode();
575             if (code != HttpServletResponse.SC_OK) {
576                 logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
577                 return null;
578             }
579             HttpEntity entity = response.getEntity();
580             String ctype = entity.getContentType().getValue().trim();
581             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
582                         && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
583                 logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
584                 return null;
585             }
586             return new JSONObject(new JSONTokener(entity.getContent()));
587         } catch (Exception e) {
588             logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
589             return null;
590         } finally {
591             get.releaseConnection();
592         }
593     }
594
595     /**
596      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
597      * the remote database.
598      *
599      * @return the bitset
600      */
601     public RLEBitSet readRemoteLoglist() {
602         RLEBitSet bs = new RLEBitSet();
603         String url = URLUtilities.generatePeerLogsURL();
604
605         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
606         if ("".equals(url)) {
607             return bs;
608         }
609         //End of fix.
610
611         HttpGet get = new HttpGet(url);
612         try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
613             HttpResponse response = httpclient.execute(get);
614             int code = response.getStatusLine().getStatusCode();
615             if (code != HttpServletResponse.SC_OK) {
616                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
617                 return bs;
618             }
619             HttpEntity entity = response.getEntity();
620             String ctype = entity.getContentType().getValue().trim();
621             if (!TEXT_CT.equals(ctype)) {
622                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
623                 return bs;
624             }
625             InputStream is = entity.getContent();
626             int ch;
627             while ((ch = is.read()) >= 0) {
628                 bos.write(ch);
629             }
630             bs.set(bos.toString());
631             is.close();
632         } catch (Exception e) {
633             logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
634             return bs;
635         } finally {
636             get.releaseConnection();
637         }
638         return bs;
639     }
640
641     /**
642      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
643      * we wish to copy to the local database.
644      *
645      * @param bs the bitset (an RELBitSet) of log records to fetch
646      */
647     public void replicateDataRouterLogs(RLEBitSet bs) {
648         String url = URLUtilities.generatePeerLogsURL();
649         HttpPost post = new HttpPost(url);
650         try {
651             String str = bs.toString();
652             HttpEntity body = new ByteArrayEntity(str.getBytes(), ContentType.create(TEXT_CT));
653             post.setEntity(body);
654             if (logger.isDebugEnabled()) {
655                 logger.debug("Requesting records: " + str);
656             }
657
658             HttpResponse response = httpclient.execute(post);
659             int code = response.getStatusLine().getStatusCode();
660             if (code != HttpServletResponse.SC_OK) {
661                 logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
662                 return;
663             }
664             HttpEntity entity = response.getEntity();
665             String ctype = entity.getContentType().getValue().trim();
666             if (!TEXT_CT.equals(ctype)) {
667                 logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
668                 return;
669             }
670
671             String spoolname = "" + System.currentTimeMillis();
672             Path tmppath = Paths.get(spooldir, spoolname);
673             Path donepath = Paths.get(spooldir, "IN." + spoolname);
674             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
675             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
676             logger.info("Approximately " + bs.cardinality() + " records replicated.");
677         } catch (Exception e) {
678             logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
679         } finally {
680             post.releaseConnection();
681         }
682     }
683 }