9eece14f9b04deb4bc510ed93bc79938f3736977
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.InputStream;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.nio.file.Files;
34 import java.nio.file.Path;
35 import java.nio.file.Paths;
36 import java.nio.file.StandardCopyOption;
37 import java.security.KeyStore;
38 import java.sql.Connection;
39 import java.sql.SQLException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.HashMap;
44 import java.util.Map;
45 import java.util.Properties;
46 import java.util.Set;
47 import java.util.Timer;
48 import java.util.TimerTask;
49 import java.util.TreeSet;
50
51 import javax.servlet.http.HttpServletResponse;
52
53 import org.apache.http.HttpEntity;
54 import org.apache.http.HttpResponse;
55 import org.apache.http.client.methods.HttpGet;
56 import org.apache.http.client.methods.HttpPost;
57 import org.apache.http.conn.scheme.Scheme;
58 import org.apache.http.conn.ssl.SSLSocketFactory;
59 import org.apache.http.entity.ByteArrayEntity;
60 import org.apache.http.entity.ContentType;
61 import org.apache.http.impl.client.AbstractHttpClient;
62 import org.apache.http.impl.client.DefaultHttpClient;
63 import org.apache.log4j.Logger;
64 import org.json.JSONArray;
65 import org.json.JSONException;
66 import org.json.JSONObject;
67 import org.json.JSONTokener;
68 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
69 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
70 import org.onap.dmaap.datarouter.provisioning.beans.Group;
71 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
72 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
73 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
74 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
75 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
76 import org.onap.dmaap.datarouter.provisioning.utils.DB;
77 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
78 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
79 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
80
81 /**
82  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
83  * <ol>
84  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
85  * the active (master) POD.</li>
86  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
87  * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
88  * of this POD.</li>
89  * </ol>
90  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
91  * <code>
92  * Security.setProperty("networkaddress.cache.ttl", "10");
93  * </code>
94  *
95  * @author Robert Eby
96  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
97  */
98 public class SynchronizerTask extends TimerTask {
99
100     /**
101      * This is a singleton -- there is only one SynchronizerTask object in the server
102      */
103     private static SynchronizerTask synctask;
104
105     /**
106      * This POD is unknown -- not on the list of PODs
107      */
108     public static final int UNKNOWN = 0;
109     /**
110      * This POD is active -- on the list of PODs, and the DNS CNAME points to us
111      */
112     public static final int ACTIVE = 1;
113     /**
114      * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us
115      */
116     public static final int STANDBY = 2;
117     private static final String[] stnames = {"UNKNOWN", "ACTIVE", "STANDBY"};
118     private static final long ONE_HOUR = 60 * 60 * 1000L;
119
120     private final Logger logger;
121     private final Timer rolex;
122     private final String spooldir;
123     private int state;
124     private boolean doFetch;
125     private long nextsynctime;
126     private AbstractHttpClient httpclient = null;
127
128     /**
129      * Get the singleton SynchronizerTask object.
130      *
131      * @return the SynchronizerTask
132      */
133     public static synchronized SynchronizerTask getSynchronizer() {
134         if (synctask == null) {
135             synctask = new SynchronizerTask();
136         }
137         return synctask;
138     }
139
140     @SuppressWarnings("deprecation")
141     private SynchronizerTask() {
142         logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
143         rolex = new Timer();
144         spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
145         state = UNKNOWN;
146         doFetch = true;        // start off with a fetch
147         nextsynctime = 0;
148
149         logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
150         try {
151             Properties props = (new DB()).getProperties();
152             String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
153             String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
154             String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
155             KeyStore keyStore = KeyStore.getInstance(type);
156             try(FileInputStream instream = new FileInputStream(new File(store))) {
157                 keyStore.load(instream, pass.toCharArray());
158
159             }
160                 store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
161                 pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
162                 KeyStore trustStore = null;
163                 if (store != null && store.length() > 0) {
164                     trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
165                     try(FileInputStream instream = new FileInputStream(new File(store))){
166                         trustStore.load(instream, pass.toCharArray());
167
168                     }
169                 }
170
171             // We are connecting with the node name, but the certificate will have the CNAME
172             // So we need to accept a non-matching certificate name
173             String keystorepass = props.getProperty(
174                 Main.KEYSTORE_PASS_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
175            try(AbstractHttpClient hc = new DefaultHttpClient()) {
176                SSLSocketFactory socketFactory =
177                        (trustStore == null)
178                                ? new SSLSocketFactory(keyStore, keystorepass)
179                                : new SSLSocketFactory(keyStore, keystorepass, trustStore);
180                socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
181                Scheme sch = new Scheme("https", 443, socketFactory);
182                hc.getConnectionManager().getSchemeRegistry().register(sch);
183             httpclient = hc;
184            }
185             // Run once every 5 seconds to check DNS, etc.
186             long interval = 0;
187             try {
188                 String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
189                 interval = Long.parseLong(s);
190             } catch (NumberFormatException e) {
191                 interval = 5000L;
192             }
193             rolex.scheduleAtFixedRate(this, 0L, interval);
194         } catch (Exception e) {
195             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
196         }
197     }
198
199     /**
200      * What is the state of this POD?
201      *
202      * @return one of ACTIVE, STANDBY, UNKNOWN
203      */
204     public int getState() {
205         return state;
206     }
207
208     /**
209      * Is this the active POD?
210      *
211      * @return true if we are active (the master), false otherwise
212      */
213     public boolean isActive() {
214         return state == ACTIVE;
215     }
216
217     /**
218      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
219      * should re-synchronize with the master.
220      */
221     public void doFetch() {
222         doFetch = true;
223     }
224
225     /**
226      * Runs once a minute in order to <ol>
227      * <li>lookup DNS names,</li>
228      * <li>determine the state of this POD,</li>
229      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
230      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
231      * </ol>
232      */
233     @Override
234     public void run() {
235         try {
236             state = lookupState();
237             if (state == STANDBY) {
238                 // Only copy provisioning data FROM the active server TO the standby
239                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
240                     logger.debug("Initiating a sync...");
241                     JSONObject jo = readProvisioningJSON();
242                     if (jo != null) {
243                         doFetch = false;
244                         syncFeeds(jo.getJSONArray("feeds"));
245                         syncSubs(jo.getJSONArray("subscriptions"));
246                         syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
247                         syncParams(jo.getJSONObject("parameters"));
248                         // The following will not be present in a version=1.0 provfeed
249                         JSONArray ja = jo.optJSONArray("ingress");
250                         if (ja != null) {
251                             syncIngressRoutes(ja);
252                         }
253                         JSONObject j2 = jo.optJSONObject("egress");
254                         if (j2 != null) {
255                             syncEgressRoutes(j2);
256                         }
257                         ja = jo.optJSONArray("routing");
258                         if (ja != null) {
259                             syncNetworkRoutes(ja);
260                         }
261                     }
262                     logger.info("PROV5013: Sync completed.");
263                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
264                 }
265             } else {
266                 // Don't do fetches on non-standby PODs
267                 doFetch = false;
268             }
269
270             // Fetch DR logs as needed - server to server
271             LogfileLoader lfl = LogfileLoader.getLoader();
272             if (lfl.isIdle()) {
273                 // Only fetch new logs if the loader is waiting for them.
274                 logger.trace("Checking for logs to replicate...");
275                 RLEBitSet local = lfl.getBitSet();
276                 RLEBitSet remote = readRemoteLoglist();
277                 remote.andNot(local);
278                 if (!remote.isEmpty()) {
279                     logger.debug(" Replicating logs: " + remote);
280                     replicateDRLogs(remote);
281                 }
282             }
283         } catch (Exception e) {
284             logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
285         }
286     }
287
288     /**
289      * This method is used to lookup the CNAME that points to the active server. It returns 0 (UNKNOWN), 1(ACTIVE), or 2
290      * (STANDBY) to indicate the state of this server.
291      *
292      * @return the current state
293      */
294     private int lookupState() {
295         int newstate = UNKNOWN;
296         try {
297             InetAddress myaddr = InetAddress.getLocalHost();
298             if (logger.isTraceEnabled()) {
299                 logger.trace("My address: " + myaddr);
300             }
301             String thisPod = myaddr.getHostName();
302             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
303             if (pods.contains(thisPod)) {
304                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
305                 newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
306                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
307                     logger.debug("Active POD = " + pserver + ", Current state is " + stnames[newstate]);
308                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
309                 }
310             } else {
311                 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
312             }
313         } catch (UnknownHostException e) {
314             logger.warn("PROV5002: Cannot determine the name of this provisioning server.");
315         }
316
317         if (newstate != state) {
318             logger
319                 .info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
320         }
321         return newstate;
322     }
323
324     private static long nextMsg = 0;    // only display the "Current state" msg every 5 mins.
325
326     /**
327      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
328      */
329     private void syncFeeds(JSONArray ja) {
330         Collection<Syncable> coll = new ArrayList<>();
331         for (int n = 0; n < ja.length(); n++) {
332             try {
333                 Feed f = new Feed(ja.getJSONObject(n));
334                 coll.add(f);
335             } catch (Exception e) {
336                 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n));
337             }
338         }
339         if (sync(coll, Feed.getAllFeeds())) {
340             BaseServlet.provisioningDataChanged();
341         }
342     }
343
344     /**
345      * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
346      */
347     private void syncSubs(JSONArray ja) {
348         Collection<Syncable> coll = new ArrayList<>();
349         for (int n = 0; n < ja.length(); n++) {
350             try {
351                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
352                 JSONObject j = ja.getJSONObject(n);
353                 j.put("sync", "true");
354                 Subscription s = new Subscription(j);
355                 coll.add(s);
356             } catch (Exception e) {
357                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n));
358             }
359         }
360         if (sync(coll, Subscription.getAllSubscriptions())) {
361             BaseServlet.provisioningDataChanged();
362         }
363     }
364
365     /**
366      * Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB.
367      */
368     private void syncGroups(JSONArray ja) {
369         Collection<Syncable> coll = new ArrayList<>();
370         for (int n = 0; n < ja.length(); n++) {
371             try {
372                 Group g = new Group(ja.getJSONObject(n));
373                 coll.add(g);
374             } catch (Exception e) {
375                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n));
376             }
377         }
378         if (sync(coll, Group.getAllgroups())) {
379             BaseServlet.provisioningDataChanged();
380         }
381     }
382
383
384     /**
385      * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
386      */
387     private void syncParams(JSONObject jo) {
388         Collection<Syncable> coll = new ArrayList<>();
389         for (String k : jo.keySet()) {
390             String v = "";
391             try {
392                 v = jo.getString(k);
393             } catch (JSONException e) {
394                 try {
395                     v = "" + jo.getInt(k);
396                 } catch (JSONException e1) {
397                     JSONArray ja = jo.getJSONArray(k);
398                     for (int i = 0; i < ja.length(); i++) {
399                         if (i > 0) {
400                             v += "|";
401                         }
402                         v += ja.getString(i);
403                     }
404                 }
405             }
406             coll.add(new Parameters(k, v));
407         }
408         if (sync(coll, Parameters.getParameterCollection())) {
409             BaseServlet.provisioningDataChanged();
410             BaseServlet.provisioningParametersChanged();
411         }
412     }
413
414     private void syncIngressRoutes(JSONArray ja) {
415         Collection<Syncable> coll = new ArrayList<>();
416         for (int n = 0; n < ja.length(); n++) {
417             try {
418                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
419                 coll.add(in);
420             } catch (NumberFormatException e) {
421                 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
422             }
423         }
424         if (sync(coll, IngressRoute.getAllIngressRoutes())) {
425             BaseServlet.provisioningDataChanged();
426         }
427     }
428
429     private void syncEgressRoutes(JSONObject jo) {
430         Collection<Syncable> coll = new ArrayList<>();
431         for (String key : jo.keySet()) {
432             try {
433                 int sub = Integer.parseInt(key);
434                 String node = jo.getString(key);
435                 EgressRoute er = new EgressRoute(sub, node);
436                 coll.add(er);
437             } catch (NumberFormatException e) {
438                 logger.warn("PROV5004: Invalid subid in egress routes: " + key);
439             } catch (IllegalArgumentException e) {
440                 logger.warn("PROV5004: Invalid node name in egress routes: " + key);
441             }
442         }
443         if (sync(coll, EgressRoute.getAllEgressRoutes())) {
444             BaseServlet.provisioningDataChanged();
445         }
446     }
447
448     private void syncNetworkRoutes(JSONArray ja) {
449         Collection<Syncable> coll = new ArrayList<>();
450         for (int n = 0; n < ja.length(); n++) {
451             try {
452                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
453                 coll.add(nr);
454             } catch (JSONException e) {
455                 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n));
456             }
457         }
458         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
459             BaseServlet.provisioningDataChanged();
460         }
461     }
462
463     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
464         boolean changes = false;
465         try {
466             Map<String, Syncable> newmap = getMap(newc);
467             Map<String, Syncable> oldmap = getMap(oldc);
468             Set<String> union = new TreeSet<>(newmap.keySet());
469             union.addAll(oldmap.keySet());
470             DB db = new DB();
471             @SuppressWarnings("resource")
472             Connection conn = db.getConnection();
473             for (String n : union) {
474                 Syncable newobj = newmap.get(n);
475                 Syncable oldobj = oldmap.get(n);
476                 if (oldobj == null) {
477                     if (logger.isDebugEnabled()) {
478                         logger.debug("  Inserting record: " + newobj);
479                     }
480                     newobj.doInsert(conn);
481                     changes = true;
482                 } else if (newobj == null) {
483                     if (logger.isDebugEnabled()) {
484                         logger.debug("  Deleting record: " + oldobj);
485                     }
486                     oldobj.doDelete(conn);
487                     changes = true;
488                 } else if (!newobj.equals(oldobj)) {
489                     if (logger.isDebugEnabled()) {
490                         logger.debug("  Updating record: " + newobj);
491                     }
492                     newobj.doUpdate(conn);
493
494                     /**Rally US708115
495                      * Change Ownership of FEED - 1610, Syncronised with secondary DB.
496                      * */
497                     checkChnageOwner(newobj, oldobj);
498
499                     changes = true;
500                 }
501             }
502             db.release(conn);
503         } catch (SQLException e) {
504             logger.warn("PROV5009: problem during sync, exception: " + e);
505         }
506         return changes;
507     }
508
509     private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
510         Map<String, Syncable> map = new HashMap<>();
511         for (Syncable v : c) {
512             map.put(v.getKey(), v);
513         }
514         return map;
515     }
516
517     /**Change owner of FEED/SUBSCRIPTION*/
518     /**
519      * Rally US708115 Change Ownership of FEED - 1610
520      */
521     private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
522         if (newobj instanceof Feed) {
523             Feed oldfeed = (Feed) oldobj;
524             Feed newfeed = (Feed) newobj;
525
526             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
527                 logger.info("PROV5013 -  Previous publisher: " + oldfeed.getPublisher() + ": New publisher-" + newfeed
528                     .getPublisher());
529                 oldfeed.setPublisher(newfeed.getPublisher());
530                 oldfeed.changeOwnerShip();
531             }
532         } else if (newobj instanceof Subscription) {
533             Subscription oldsub = (Subscription) oldobj;
534             Subscription newsub = (Subscription) newobj;
535
536             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
537                 logger.info("PROV5013 -  Previous subscriber: " + oldsub.getSubscriber() + ": New subscriber-" + newsub
538                     .getSubscriber());
539                 oldsub.setSubscriber(newsub.getSubscriber());
540                 oldsub.changeOwnerShip();
541             }
542         }
543
544     }
545
546     /**
547      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
548      *
549      * @return the provisioning data (as a JONObject)
550      */
551     private synchronized JSONObject readProvisioningJSON() {
552         String url = URLUtilities.generatePeerProvURL();
553         HttpGet get = new HttpGet(url);
554         try {
555             HttpResponse response = httpclient.execute(get);
556             int code = response.getStatusLine().getStatusCode();
557             if (code != HttpServletResponse.SC_OK) {
558                 logger.warn("PROV5010: readProvisioningJSON failed, bad error code: " + code);
559                 return null;
560             }
561             HttpEntity entity = response.getEntity();
562             String ctype = entity.getContentType().getValue().trim();
563             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype
564                 .equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
565                 logger.warn("PROV5011: readProvisioningJSON failed, bad content type: " + ctype);
566                 return null;
567             }
568             return new JSONObject(new JSONTokener(entity.getContent()));
569         } catch (Exception e) {
570             logger.warn("PROV5012: readProvisioningJSON failed, exception: " + e);
571             return null;
572         } finally {
573             get.releaseConnection();
574         }
575     }
576
577     /**
578      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
579      * the remote database.
580      *
581      * @return the bitset
582      */
583     private RLEBitSet readRemoteLoglist() {
584         RLEBitSet bs = new RLEBitSet();
585         String url = URLUtilities.generatePeerLogsURL();
586
587         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
588         if (url.equals("")) {
589             return bs;
590         }
591         //End of fix.
592
593         HttpGet get = new HttpGet(url);
594         try {
595             HttpResponse response = httpclient.execute(get);
596             int code = response.getStatusLine().getStatusCode();
597             if (code != HttpServletResponse.SC_OK) {
598                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
599                 return bs;
600             }
601             HttpEntity entity = response.getEntity();
602             String ctype = entity.getContentType().getValue().trim();
603             if (!ctype.equals("text/plain")) {
604                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
605                 return bs;
606             }
607             InputStream is = entity.getContent();
608             ByteArrayOutputStream bos = new ByteArrayOutputStream();
609             int ch = 0;
610             while ((ch = is.read()) >= 0) {
611                 bos.write(ch);
612             }
613             bs.set(bos.toString());
614             is.close();
615         } catch (Exception e) {
616             logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
617             return bs;
618         } finally {
619             get.releaseConnection();
620         }
621         return bs;
622     }
623
624     /**
625      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
626      * we wish to copy to the local database.
627      *
628      * @param bs the bitset (an RELBitSet) of log records to fetch
629      */
630     private void replicateDRLogs(RLEBitSet bs) {
631         String url = URLUtilities.generatePeerLogsURL();
632         HttpPost post = new HttpPost(url);
633         try {
634             String t = bs.toString();
635             HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
636             post.setEntity(body);
637             if (logger.isDebugEnabled()) {
638                 logger.debug("Requesting records: " + t);
639             }
640
641             HttpResponse response = httpclient.execute(post);
642             int code = response.getStatusLine().getStatusCode();
643             if (code != HttpServletResponse.SC_OK) {
644                 logger.warn("PROV5010: replicateDRLogs failed, bad error code: " + code);
645                 return;
646             }
647             HttpEntity entity = response.getEntity();
648             String ctype = entity.getContentType().getValue().trim();
649             if (!ctype.equals("text/plain")) {
650                 logger.warn("PROV5011: replicateDRLogs failed, bad content type: " + ctype);
651                 return;
652             }
653
654             String spoolname = "" + System.currentTimeMillis();
655             Path tmppath = Paths.get(spooldir, spoolname);
656             Path donepath = Paths.get(spooldir, "IN." + spoolname);
657             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
658             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
659             logger.info("Approximately " + bs.cardinality() + " records replicated.");
660         } catch (Exception e) {
661             logger.warn("PROV5012: replicateDRLogs failed, exception: " + e);
662         } finally {
663             post.releaseConnection();
664         }
665     }
666 }