8c5a49a49fdfb2b7047674e7902c74f8916c9f80
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning;
26
27 import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
28
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31
32 import java.io.ByteArrayOutputStream;
33 import java.io.File;
34 import java.io.FileInputStream;
35 import java.io.InputStream;
36 import java.net.InetAddress;
37 import java.net.UnknownHostException;
38 import java.nio.file.Files;
39 import java.nio.file.Path;
40 import java.nio.file.Paths;
41 import java.nio.file.StandardCopyOption;
42 import java.security.KeyStore;
43 import java.sql.Connection;
44 import java.sql.SQLException;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.Collection;
48 import java.util.HashMap;
49 import java.util.Map;
50 import java.util.Properties;
51 import java.util.Set;
52 import java.util.Timer;
53 import java.util.TimerTask;
54 import java.util.TreeSet;
55
56 import javax.servlet.http.HttpServletResponse;
57
58 import org.apache.http.HttpEntity;
59 import org.apache.http.HttpResponse;
60 import org.apache.http.client.methods.HttpGet;
61 import org.apache.http.client.methods.HttpPost;
62 import org.apache.http.conn.scheme.Scheme;
63 import org.apache.http.conn.ssl.SSLSocketFactory;
64 import org.apache.http.entity.ByteArrayEntity;
65 import org.apache.http.entity.ContentType;
66 import org.apache.http.impl.client.AbstractHttpClient;
67 import org.apache.http.impl.client.DefaultHttpClient;
68 import org.json.JSONArray;
69 import org.json.JSONException;
70 import org.json.JSONObject;
71 import org.json.JSONTokener;
72 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
73 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
74 import org.onap.dmaap.datarouter.provisioning.beans.Group;
75 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
76 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
77 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
78 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
79 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
80 import org.onap.dmaap.datarouter.provisioning.utils.DB;
81 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
82 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
83 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
84
85 /**
86  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
87  * <ol>
88  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
89  * the active (master) POD.</li>
90  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
91  * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
92  * of this POD.</li>
93  * </ol>
94  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
95  * <code>
96  * Security.setProperty("networkaddress.cache.ttl", "10");
97  * </code>
98  *
99  * @author Robert Eby
100  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
101  */
102 public class SynchronizerTask extends TimerTask {
103
104     /**
105      * This is a singleton -- there is only one SynchronizerTask object in the server.
106      */
107     private static SynchronizerTask synctask;
108
109     /**
110      * This POD is unknown -- not on the list of PODs.
111      */
112     public static final int UNKNOWN_POD = 0;
113     /**
114      * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
115      */
116     public static final int ACTIVE_POD = 1;
117     /**
118      * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
119      */
120     public static final int STANDBY_POD = 2;
121
122     private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
123     private static final long ONE_HOUR = 60 * 60 * 1000L;
124
125     private long nextMsg = 0;    // only display the "Current podState" msg every 5 mins.
126
127     private final EELFLogger logger;
128     private final Timer rolex;
129     private final String spooldir;
130     private int podState;
131     private boolean doFetch;
132     private long nextsynctime;
133     private AbstractHttpClient httpclient = null;
134
135     @SuppressWarnings("deprecation")
136     private SynchronizerTask() {
137         logger = EELFManager.getInstance().getLogger("InternalLog");
138         rolex = new Timer();
139         spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
140         podState = UNKNOWN_POD;
141         doFetch = true;        // start off with a fetch
142         nextsynctime = 0;
143
144         logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
145         try {
146             Properties props = (new DB()).getProperties();
147             String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
148             String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
149             String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
150             KeyStore keyStore = KeyStore.getInstance(type);
151             try (FileInputStream instream = new FileInputStream(new File(store))) {
152                 keyStore.load(instream, pass.toCharArray());
153
154             }
155             store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
156             pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
157             KeyStore trustStore = null;
158             if (store != null && store.length() > 0) {
159                 trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
160                 try (FileInputStream instream = new FileInputStream(new File(store))) {
161                     trustStore.load(instream, pass.toCharArray());
162
163                 }
164             }
165
166             // We are connecting with the node name, but the certificate will have the CNAME
167             // So we need to accept a non-matching certificate name
168             String keystorepass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
169             try (AbstractHttpClient hc = new DefaultHttpClient()) {
170                 SSLSocketFactory socketFactory =
171                         (trustStore == null)
172                                 ? new SSLSocketFactory(keyStore, keystorepass)
173                                 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
174                 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
175                 Scheme sch = new Scheme("https", 443, socketFactory);
176                 hc.getConnectionManager().getSchemeRegistry().register(sch);
177                 httpclient = hc;
178             }
179             setSynchTimer(props);
180         } catch (Exception e) {
181             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
182         }
183     }
184
185     private void setSynchTimer(Properties props) {
186         // Run once every 5 seconds to check DNS, etc.
187         long interval;
188         try {
189             String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
190             interval = Long.parseLong(s);
191         } catch (NumberFormatException e) {
192             interval = 5000L;
193         }
194         rolex.scheduleAtFixedRate(this, 0L, interval);
195     }
196
197     /**
198      * Get the singleton SynchronizerTask object.
199      *
200      * @return the SynchronizerTask
201      */
202     public static synchronized SynchronizerTask getSynchronizer() {
203         if (synctask == null) {
204             synctask = new SynchronizerTask();
205         }
206         return synctask;
207     }
208
209     /**
210      * What is the podState of this POD?.
211      *
212      * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
213      */
214     public int getPodState() {
215         return podState;
216     }
217
218     /**
219      * Is this the active POD?.
220      *
221      * @return true if we are active (the master), false otherwise
222      */
223     public boolean isActive() {
224         return podState == ACTIVE_POD;
225     }
226
227     /**
228      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
229      * should re-synchronize with the master.
230      */
231     void doFetch() {
232         doFetch = true;
233     }
234
235     /**
236      * Runs once a minute in order to <ol>
237      * <li>lookup DNS names,</li>
238      * <li>determine the podState of this POD,</li>
239      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
240      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
241      * </ol>.
242      */
243     @Override
244     public void run() {
245         try {
246             podState = lookupState();
247             if (podState == STANDBY_POD) {
248                 // Only copy provisioning data FROM the active server TO the standby
249                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
250                     syncProvisioningData();
251                     logger.info("PROV5013: Sync completed.");
252                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
253                 }
254             } else {
255                 // Don't do fetches on non-standby PODs
256                 doFetch = false;
257             }
258
259             // Fetch DR logs as needed - server to server
260             LogfileLoader lfl = LogfileLoader.getLoader();
261             if (lfl.isIdle()) {
262                 // Only fetch new logs if the loader is waiting for them.
263                 logger.trace("Checking for logs to replicate...");
264                 RLEBitSet local = lfl.getBitSet();
265                 RLEBitSet remote = readRemoteLoglist();
266                 remote.andNot(local);
267                 if (!remote.isEmpty()) {
268                     logger.debug(" Replicating logs: " + remote);
269                     replicateDataRouterLogs(remote);
270                 }
271             }
272         } catch (Exception e) {
273             logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
274         }
275     }
276
277     private void syncProvisioningData() {
278         logger.debug("Initiating a sync...");
279         JSONObject jo = readProvisioningJson();
280         if (jo != null) {
281             doFetch = false;
282             syncFeeds(jo.getJSONArray("feeds"));
283             syncSubs(jo.getJSONArray("subscriptions"));
284             syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
285             syncParams(jo.getJSONObject("parameters"));
286             // The following will not be present in a version=1.0 provfeed
287             JSONArray ja = jo.optJSONArray("ingress");
288             if (ja != null) {
289                 syncIngressRoutes(ja);
290             }
291             JSONObject j2 = jo.optJSONObject("egress");
292             if (j2 != null) {
293                 syncEgressRoutes(j2);
294             }
295             ja = jo.optJSONArray("routing");
296             if (ja != null) {
297                 syncNetworkRoutes(ja);
298             }
299         }
300     }
301
302     /**
303      * This method is used to lookup the CNAME that points to the active server.
304      * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
305      *
306      * @return the current podState
307      */
308     int lookupState() {
309         int newPodState = UNKNOWN_POD;
310         try {
311             InetAddress myaddr = InetAddress.getLocalHost();
312             if (logger.isTraceEnabled()) {
313                 logger.trace("My address: " + myaddr);
314             }
315             String thisPod = myaddr.getHostName();
316             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
317             if (pods.contains(thisPod)) {
318                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
319                 newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
320                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
321                     logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
322                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
323                 }
324             } else {
325                 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
326             }
327         } catch (UnknownHostException e) {
328             logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
329         }
330
331         if (newPodState != podState) {
332             logger.info(String.format("PROV5001: Server podState changed from %s to %s",
333                     stnames[podState], stnames[newPodState]));
334         }
335         return newPodState;
336     }
337
338     /**
339      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
340      */
341     private void syncFeeds(JSONArray ja) {
342         Collection<Syncable> coll = new ArrayList<>();
343         for (int n = 0; n < ja.length(); n++) {
344             try {
345                 Feed f = new Feed(ja.getJSONObject(n));
346                 coll.add(f);
347             } catch (Exception e) {
348                 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
349             }
350         }
351         if (sync(coll, Feed.getAllFeeds())) {
352             BaseServlet.provisioningDataChanged();
353         }
354     }
355
356     /**
357      * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
358      */
359     private void syncSubs(JSONArray ja) {
360         Collection<Syncable> coll = new ArrayList<>();
361         for (int n = 0; n < ja.length(); n++) {
362             try {
363                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
364                 JSONObject j = ja.getJSONObject(n);
365                 j.put("sync", "true");
366                 Subscription s = new Subscription(j);
367                 coll.add(s);
368             } catch (Exception e) {
369                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
370             }
371         }
372         if (sync(coll, Subscription.getAllSubscriptions())) {
373             BaseServlet.provisioningDataChanged();
374         }
375     }
376
377     /**
378      * Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB.
379      */
380     private void syncGroups(JSONArray ja) {
381         Collection<Syncable> coll = new ArrayList<>();
382         for (int n = 0; n < ja.length(); n++) {
383             try {
384                 Group g = new Group(ja.getJSONObject(n));
385                 coll.add(g);
386             } catch (Exception e) {
387                 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
388             }
389         }
390         if (sync(coll, Group.getAllgroups())) {
391             BaseServlet.provisioningDataChanged();
392         }
393     }
394
395
396     /**
397      * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
398      */
399     private void syncParams(JSONObject jo) {
400         Collection<Syncable> coll = new ArrayList<>();
401         for (String k : jo.keySet()) {
402             String v = "";
403             try {
404                 v = jo.getString(k);
405             } catch (JSONException e) {
406                 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
407                 try {
408                     v = "" + jo.getInt(k);
409                 } catch (JSONException e1) {
410                     logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
411                     JSONArray ja = jo.getJSONArray(k);
412                     for (int i = 0; i < ja.length(); i++) {
413                         if (i > 0) {
414                             v += "|";
415                         }
416                         v += ja.getString(i);
417                     }
418                 }
419             }
420             coll.add(new Parameters(k, v));
421         }
422         if (sync(coll, Parameters.getParameterCollection())) {
423             BaseServlet.provisioningDataChanged();
424             BaseServlet.provisioningParametersChanged();
425         }
426     }
427
428     private void syncIngressRoutes(JSONArray ja) {
429         Collection<Syncable> coll = new ArrayList<>();
430         for (int n = 0; n < ja.length(); n++) {
431             try {
432                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
433                 coll.add(in);
434             } catch (NumberFormatException e) {
435                 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
436             }
437         }
438         if (sync(coll, IngressRoute.getAllIngressRoutes())) {
439             BaseServlet.provisioningDataChanged();
440         }
441     }
442
443     private void syncEgressRoutes(JSONObject jo) {
444         Collection<Syncable> coll = new ArrayList<>();
445         for (String key : jo.keySet()) {
446             try {
447                 int sub = Integer.parseInt(key);
448                 String node = jo.getString(key);
449                 EgressRoute er = new EgressRoute(sub, node);
450                 coll.add(er);
451             } catch (NumberFormatException e) {
452                 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
453             } catch (IllegalArgumentException e) {
454                 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
455             }
456         }
457         if (sync(coll, EgressRoute.getAllEgressRoutes())) {
458             BaseServlet.provisioningDataChanged();
459         }
460     }
461
462     private void syncNetworkRoutes(JSONArray ja) {
463         Collection<Syncable> coll = new ArrayList<>();
464         for (int n = 0; n < ja.length(); n++) {
465             try {
466                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
467                 coll.add(nr);
468             } catch (JSONException e) {
469                 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
470             }
471         }
472         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
473             BaseServlet.provisioningDataChanged();
474         }
475     }
476
477     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
478         boolean changes = false;
479         try {
480             Map<String, Syncable> newmap = getMap(newc);
481             Map<String, Syncable> oldmap = getMap(oldc);
482             Set<String> union = new TreeSet<>(newmap.keySet());
483             union.addAll(oldmap.keySet());
484             DB db = new DB();
485             @SuppressWarnings("resource")
486             Connection conn = db.getConnection();
487             for (String n : union) {
488                 Syncable newobj = newmap.get(n);
489                 Syncable oldobj = oldmap.get(n);
490                 if (oldobj == null) {
491                     changes = insertRecord(conn, newobj);
492                 } else if (newobj == null) {
493                     changes = deleteRecord(conn, oldobj);
494                 } else if (!newobj.equals(oldobj)) {
495                     changes = updateRecord(conn, newobj, oldobj);
496                 }
497             }
498             db.release(conn);
499         } catch (SQLException e) {
500             logger.warn("PROV5009: problem during sync, exception: " + e);
501         }
502         return changes;
503     }
504
505     private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
506         if (logger.isDebugEnabled()) {
507             logger.debug("  Updating record: " + newobj);
508         }
509         boolean changes = newobj.doUpdate(conn);
510         checkChangeOwner(newobj, oldobj);
511
512         return changes;
513     }
514
515     private boolean deleteRecord(Connection conn, Syncable oldobj) {
516         if (logger.isDebugEnabled()) {
517             logger.debug("  Deleting record: " + oldobj);
518         }
519         return oldobj.doDelete(conn);
520     }
521
522     private boolean insertRecord(Connection conn, Syncable newobj) {
523         if (logger.isDebugEnabled()) {
524             logger.debug("  Inserting record: " + newobj);
525         }
526         return newobj.doInsert(conn);
527     }
528
529     private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
530         Map<String, Syncable> map = new HashMap<>();
531         for (Syncable v : c) {
532             map.put(v.getKey(), v);
533         }
534         return map;
535     }
536
537     /**
538      * Change owner of FEED/SUBSCRIPTION.
539      * Rally US708115 Change Ownership of FEED - 1610
540      */
541     private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
542         if (newobj instanceof Feed) {
543             Feed oldfeed = (Feed) oldobj;
544             Feed newfeed = (Feed) newobj;
545
546             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
547                 logger.info("PROV5013 -  Previous publisher: "
548                                     + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
549                 oldfeed.setPublisher(newfeed.getPublisher());
550                 oldfeed.changeOwnerShip();
551             }
552         } else if (newobj instanceof Subscription) {
553             Subscription oldsub = (Subscription) oldobj;
554             Subscription newsub = (Subscription) newobj;
555
556             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
557                 logger.info("PROV5013 -  Previous subscriber: "
558                                     + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
559                 oldsub.setSubscriber(newsub.getSubscriber());
560                 oldsub.changeOwnerShip();
561             }
562         }
563
564     }
565
566     /**
567      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
568      *
569      * @return the provisioning data (as a JONObject)
570      */
571     private synchronized JSONObject readProvisioningJson() {
572         String url = URLUtilities.generatePeerProvURL();
573         HttpGet get = new HttpGet(url);
574         try {
575             HttpResponse response = httpclient.execute(get);
576             int code = response.getStatusLine().getStatusCode();
577             if (code != HttpServletResponse.SC_OK) {
578                 logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
579                 return null;
580             }
581             HttpEntity entity = response.getEntity();
582             String ctype = entity.getContentType().getValue().trim();
583             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
584                         && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
585                 logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
586                 return null;
587             }
588             return new JSONObject(new JSONTokener(entity.getContent()));
589         } catch (Exception e) {
590             logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
591             return null;
592         } finally {
593             get.releaseConnection();
594         }
595     }
596
597     /**
598      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
599      * the remote database.
600      *
601      * @return the bitset
602      */
603     RLEBitSet readRemoteLoglist() {
604         RLEBitSet bs = new RLEBitSet();
605         String url = URLUtilities.generatePeerLogsURL();
606
607         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
608         if ("".equals(url)) {
609             return bs;
610         }
611         //End of fix.
612
613         HttpGet get = new HttpGet(url);
614         try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
615             HttpResponse response = httpclient.execute(get);
616             int code = response.getStatusLine().getStatusCode();
617             if (code != HttpServletResponse.SC_OK) {
618                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
619                 return bs;
620             }
621             HttpEntity entity = response.getEntity();
622             String ctype = entity.getContentType().getValue().trim();
623             if (!TEXT_CT.equals(ctype)) {
624                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
625                 return bs;
626             }
627             InputStream is = entity.getContent();
628             int ch;
629             while ((ch = is.read()) >= 0) {
630                 bos.write(ch);
631             }
632             bs.set(bos.toString());
633             is.close();
634         } catch (Exception e) {
635             logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
636             return bs;
637         } finally {
638             get.releaseConnection();
639         }
640         return bs;
641     }
642
643     /**
644      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
645      * we wish to copy to the local database.
646      *
647      * @param bs the bitset (an RELBitSet) of log records to fetch
648      */
649     void replicateDataRouterLogs(RLEBitSet bs) {
650         String url = URLUtilities.generatePeerLogsURL();
651         HttpPost post = new HttpPost(url);
652         try {
653             String t = bs.toString();
654             HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create(TEXT_CT));
655             post.setEntity(body);
656             if (logger.isDebugEnabled()) {
657                 logger.debug("Requesting records: " + t);
658             }
659
660             HttpResponse response = httpclient.execute(post);
661             int code = response.getStatusLine().getStatusCode();
662             if (code != HttpServletResponse.SC_OK) {
663                 logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
664                 return;
665             }
666             HttpEntity entity = response.getEntity();
667             String ctype = entity.getContentType().getValue().trim();
668             if (!TEXT_CT.equals(ctype)) {
669                 logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
670                 return;
671             }
672
673             String spoolname = "" + System.currentTimeMillis();
674             Path tmppath = Paths.get(spooldir, spoolname);
675             Path donepath = Paths.get(spooldir, "IN." + spoolname);
676             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
677             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
678             logger.info("Approximately " + bs.cardinality() + " records replicated.");
679         } catch (Exception e) {
680             logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
681         } finally {
682             post.releaseConnection();
683         }
684     }
685 }