update link to upper-constraints.txt
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning.utils;
26
27 import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
28
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31 import java.io.ByteArrayOutputStream;
32 import java.io.FileInputStream;
33 import java.io.InputStream;
34 import java.net.InetAddress;
35 import java.net.UnknownHostException;
36 import java.nio.file.Files;
37 import java.nio.file.Path;
38 import java.nio.file.Paths;
39 import java.nio.file.StandardCopyOption;
40 import java.security.KeyStore;
41 import java.sql.Connection;
42 import java.sql.SQLException;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collection;
46 import java.util.HashMap;
47 import java.util.Map;
48 import java.util.Set;
49 import java.util.Timer;
50 import java.util.TimerTask;
51 import java.util.TreeSet;
52 import jakarta.servlet.http.HttpServletResponse;
53 import org.apache.http.HttpEntity;
54 import org.apache.http.HttpResponse;
55 import org.apache.http.client.methods.HttpGet;
56 import org.apache.http.client.methods.HttpPost;
57 import org.apache.http.conn.scheme.PlainSocketFactory;
58 import org.apache.http.conn.scheme.Scheme;
59 import org.apache.http.conn.ssl.SSLSocketFactory;
60 import org.apache.http.entity.ByteArrayEntity;
61 import org.apache.http.entity.ContentType;
62 import org.apache.http.impl.client.AbstractHttpClient;
63 import org.apache.http.impl.client.DefaultHttpClient;
64 import org.json.JSONArray;
65 import org.json.JSONException;
66 import org.json.JSONObject;
67 import org.json.JSONTokener;
68 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
69 import org.onap.dmaap.datarouter.provisioning.ProvRunner;
70 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
71 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
72 import org.onap.dmaap.datarouter.provisioning.beans.Group;
73 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
74 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
75 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
76 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
77 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
78
79 /**
80  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
81  * <ol>
82  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
83  * the active (master) POD.</li>
84  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
85  * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
86  * of this POD.</li>
87  * </ol>
88  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
89  * <code>
90  * Security.setProperty("networkaddress.cache.ttl", "10");
91  * </code>
92  *
93  * @author Robert Eby
94  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
95  */
96
97 public class SynchronizerTask extends TimerTask {
98
99     /**
100      * This is a singleton -- there is only one SynchronizerTask object in the server.
101      */
102     private static SynchronizerTask synctask;
103
104     /**
105      * This POD is unknown -- not on the list of PODs.
106      */
107     public static final int UNKNOWN_POD = 0;
108     /**
109      * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
110      */
111     public static final int ACTIVE_POD = 1;
112     /**
113      * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
114      */
115     public static final int STANDBY_POD = 2;
116
117     private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
118     private static final long ONE_HOUR = 60 * 60 * 1000L;
119
120     private long nextMsg = 0;    // only display the "Current podState" msg every 5 mins.
121
122     private final EELFLogger logger;
123     private final Timer rolex;
124     private final String spooldir;
125     private int podState;
126     private boolean doFetch;
127     private long nextsynctime;
128     private AbstractHttpClient httpclient = null;
129
130     @SuppressWarnings("deprecation")
131     private SynchronizerTask() {
132         logger = EELFManager.getInstance().getLogger("InternalLog");
133         rolex = new Timer();
134         spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
135         podState = UNKNOWN_POD;
136         doFetch = true;        // start off with a fetch
137         nextsynctime = 0;
138
139         logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
140         try (AbstractHttpClient hc = new DefaultHttpClient()) {
141             Scheme sch;
142             if (Boolean.TRUE.equals(ProvRunner.getTlsEnabled())) {
143                 SSLSocketFactory socketFactory = ProvRunner.getProvTlsManager().getSslSocketFactory();
144                 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
145                 sch = new Scheme("https", 443, socketFactory);
146             } else {
147                 PlainSocketFactory socketFactory = new PlainSocketFactory();
148                 sch = new Scheme("http", 80, socketFactory);
149             }
150             hc.getConnectionManager().getSchemeRegistry().register(sch);
151             httpclient = hc;
152             setSynchTimer(ProvRunner.getProvProperties().getProperty(
153                     "org.onap.dmaap.datarouter.provserver.sync_interval", "5000"));
154         } catch (Exception e) {
155             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
156         }
157         logger.info("PROV5000: SynchronizerTask started");
158     }
159
160     private void setSynchTimer(String strInterval) {
161         // Run once every 5 seconds to check DNS, etc.
162         long interval;
163         try {
164             interval = Long.parseLong(strInterval);
165         } catch (NumberFormatException e) {
166             interval = 5000L;
167         }
168         rolex.scheduleAtFixedRate(this, 0L, interval);
169     }
170
171     /**
172      * Get the singleton SynchronizerTask object.
173      *
174      * @return the SynchronizerTask
175      */
176     public static synchronized SynchronizerTask getSynchronizer() {
177         if (synctask == null) {
178             synctask = new SynchronizerTask();
179         }
180         return synctask;
181     }
182
183     /**
184      * What is the podState of this POD?.
185      *
186      * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
187      */
188     public int getPodState() {
189         return podState;
190     }
191
192     /**
193      * Is this the active POD?.
194      *
195      * @return true if we are active (the master), false otherwise
196      */
197     public boolean isActive() {
198         return podState == ACTIVE_POD;
199     }
200
201     /**
202      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
203      * should re-synchronize with the master.
204      */
205     public void doFetch() {
206         doFetch = true;
207     }
208
209     /**
210      * Runs once a minute in order to <ol>
211      * <li>lookup DNS names,</li>
212      * <li>determine the podState of this POD,</li>
213      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
214      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
215      * </ol>.
216      */
217     @Override
218     public void run() {
219         try {
220             podState = lookupState();
221             if (podState == STANDBY_POD) {
222                 // Only copy provisioning data FROM the active server TO the standby
223                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
224                     syncProvisioningData();
225                     logger.info("PROV5013: Sync completed.");
226                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
227                 }
228             } else {
229                 // Don't do fetches on non-standby PODs
230                 doFetch = false;
231             }
232
233             // Fetch DR logs as needed - server to server
234             LogfileLoader lfl = LogfileLoader.getLoader();
235             if (lfl.isIdle()) {
236                 // Only fetch new logs if the loader is waiting for them.
237                 logger.trace("Checking for logs to replicate...");
238                 RLEBitSet local = lfl.getBitSet();
239                 RLEBitSet remote = readRemoteLoglist();
240                 remote.andNot(local);
241                 if (!remote.isEmpty()) {
242                     logger.debug(" Replicating logs: " + remote);
243                     replicateDataRouterLogs(remote);
244                 }
245             }
246         } catch (Exception e) {
247             logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
248         }
249     }
250
251     private void syncProvisioningData() {
252         logger.debug("Initiating a sync...");
253         JSONObject jo = readProvisioningJson();
254         if (jo != null) {
255             doFetch = false;
256             syncFeeds(jo.getJSONArray("feeds"));
257             syncSubs(jo.getJSONArray("subscriptions"));
258             syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
259             syncParams(jo.getJSONObject("parameters"));
260             // The following will not be present in a version=1.0 provfeed
261             JSONArray ja = jo.optJSONArray("ingress");
262             if (ja != null) {
263                 syncIngressRoutes(ja);
264             }
265             JSONObject j2 = jo.optJSONObject("egress");
266             if (j2 != null) {
267                 syncEgressRoutes(j2);
268             }
269             ja = jo.optJSONArray("routing");
270             if (ja != null) {
271                 syncNetworkRoutes(ja);
272             }
273         }
274     }
275
276     /**
277      * This method is used to lookup the CNAME that points to the active server.
278      * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
279      *
280      * @return the current podState
281      */
282     public int lookupState() {
283         int newPodState = UNKNOWN_POD;
284         try {
285             InetAddress myaddr = InetAddress.getLocalHost();
286             if (logger.isTraceEnabled()) {
287                 logger.trace("My address: " + myaddr);
288             }
289             String thisPod = myaddr.getHostName();
290             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
291             if (pods.contains(thisPod)) {
292                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
293                 newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
294                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
295                     logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
296                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
297                 }
298             } else {
299                 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
300             }
301         } catch (UnknownHostException e) {
302             logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
303         }
304
305         if (newPodState != podState) {
306             logger.info(String.format("PROV5001: Server podState changed from %s to %s",
307                     stnames[podState], stnames[newPodState]));
308         }
309         return newPodState;
310     }
311
312     /**
313      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
314      */
315     private void syncFeeds(JSONArray ja) {
316         Collection<Syncable> coll = new ArrayList<>();
317         for (int n = 0; n < ja.length(); n++) {
318             try {
319                 Feed feed = new Feed(ja.getJSONObject(n));
320                 coll.add(feed);
321             } catch (Exception e) {
322                 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
323             }
324         }
325         if (sync(coll, Feed.getAllFeeds())) {
326             BaseServlet.provisioningDataChanged();
327         }
328     }
329
330     /**
331      * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
332      */
333     private void syncSubs(JSONArray ja) {
334         Collection<Syncable> coll = new ArrayList<>();
335         for (int n = 0; n < ja.length(); n++) {
336             try {
337                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
338                 JSONObject jsonObject = ja.getJSONObject(n);
339                 jsonObject.put("sync", "true");
340                 Subscription sub = new Subscription(jsonObject);
341                 coll.add(sub);
342             } catch (Exception e) {
343                 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
344             }
345         }
346         if (sync(coll, Subscription.getAllSubscriptions())) {
347             BaseServlet.provisioningDataChanged();
348         }
349     }
350
351     /**
352      * Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB.
353      */
354     private void syncGroups(JSONArray ja) {
355         Collection<Syncable> coll = new ArrayList<>();
356         for (int n = 0; n < ja.length(); n++) {
357             try {
358                 Group group = new Group(ja.getJSONObject(n));
359                 coll.add(group);
360             } catch (Exception e) {
361                 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
362             }
363         }
364         if (sync(coll, Group.getAllgroups())) {
365             BaseServlet.provisioningDataChanged();
366         }
367     }
368
369
370     /**
371      * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
372      */
373     private void syncParams(JSONObject jo) {
374         Collection<Syncable> coll = new ArrayList<>();
375         for (String k : jo.keySet()) {
376             String val = "";
377             try {
378                 val = jo.getString(k);
379             } catch (JSONException e) {
380                 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
381                 try {
382                     val = "" + jo.getInt(k);
383                 } catch (JSONException e1) {
384                     logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
385                     JSONArray ja = jo.getJSONArray(k);
386                     for (int i = 0; i < ja.length(); i++) {
387                         if (i > 0) {
388                             val += "|";
389                         }
390                         val += ja.getString(i);
391                     }
392                 }
393             }
394             coll.add(new Parameters(k, val));
395         }
396         if (sync(coll, Parameters.getParameterCollection())) {
397             BaseServlet.provisioningDataChanged();
398             BaseServlet.provisioningParametersChanged();
399         }
400     }
401
402     private void syncIngressRoutes(JSONArray ja) {
403         Collection<Syncable> coll = new ArrayList<>();
404         for (int n = 0; n < ja.length(); n++) {
405             try {
406                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
407                 coll.add(in);
408             } catch (NumberFormatException e) {
409                 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
410             }
411         }
412         if (sync(coll, IngressRoute.getAllIngressRoutes())) {
413             BaseServlet.provisioningDataChanged();
414         }
415     }
416
417     private void syncEgressRoutes(JSONObject jo) {
418         Collection<Syncable> coll = new ArrayList<>();
419         for (String key : jo.keySet()) {
420             try {
421                 int sub = Integer.parseInt(key);
422                 String node = jo.getString(key);
423                 EgressRoute er = new EgressRoute(sub, node);
424                 coll.add(er);
425             } catch (NumberFormatException e) {
426                 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
427             } catch (IllegalArgumentException e) {
428                 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
429             }
430         }
431         if (sync(coll, EgressRoute.getAllEgressRoutes())) {
432             BaseServlet.provisioningDataChanged();
433         }
434     }
435
436     private void syncNetworkRoutes(JSONArray ja) {
437         Collection<Syncable> coll = new ArrayList<>();
438         for (int n = 0; n < ja.length(); n++) {
439             try {
440                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
441                 coll.add(nr);
442             } catch (JSONException e) {
443                 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
444             }
445         }
446         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
447             BaseServlet.provisioningDataChanged();
448         }
449     }
450
451     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
452         boolean changes = false;
453         try {
454             Map<String, Syncable> newmap = getMap(newc);
455             Map<String, Syncable> oldmap = getMap(oldc);
456             Set<String> union = new TreeSet<>(newmap.keySet());
457             union.addAll(oldmap.keySet());
458             for (String n : union) {
459                 Syncable newobj = newmap.get(n);
460                 Syncable oldobj = oldmap.get(n);
461                 if (oldobj == null) {
462                     try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
463                         changes = insertRecord(conn, newobj);
464                     }
465                 } else if (newobj == null) {
466                     try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
467                         changes = deleteRecord(conn, oldobj);
468                     }
469                 } else if (!newobj.equals(oldobj)) {
470                     try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
471                         changes = updateRecord(conn, newobj, oldobj);
472                     }
473                 }
474             }
475         } catch (SQLException e) {
476             logger.warn("PROV5009: problem during sync, exception: " + e);
477         }
478         return changes;
479     }
480
481     private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
482         if (logger.isDebugEnabled()) {
483             logger.debug("  Updating record: " + newobj);
484         }
485         boolean changes = newobj.doUpdate(conn);
486         checkChangeOwner(newobj, oldobj);
487
488         return changes;
489     }
490
491     private boolean deleteRecord(Connection conn, Syncable oldobj) {
492         if (logger.isDebugEnabled()) {
493             logger.debug("  Deleting record: " + oldobj);
494         }
495         return oldobj.doDelete(conn);
496     }
497
498     private boolean insertRecord(Connection conn, Syncable newobj) {
499         if (logger.isDebugEnabled()) {
500             logger.debug("  Inserting record: " + newobj);
501         }
502         return newobj.doInsert(conn);
503     }
504
505     private Map<String, Syncable> getMap(Collection<? extends Syncable> coll) {
506         Map<String, Syncable> map = new HashMap<>();
507         for (Syncable v : coll) {
508             map.put(v.getKey(), v);
509         }
510         return map;
511     }
512
513     /**
514      * Change owner of FEED/SUBSCRIPTION.
515      * Rally US708115 Change Ownership of FEED - 1610
516      */
517     private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
518         if (newobj instanceof Feed) {
519             Feed oldfeed = (Feed) oldobj;
520             Feed newfeed = (Feed) newobj;
521
522             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
523                 logger.info("PROV5013 -  Previous publisher: "
524                                     + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
525                 oldfeed.setPublisher(newfeed.getPublisher());
526                 oldfeed.changeOwnerShip();
527             }
528         } else if (newobj instanceof Subscription) {
529             Subscription oldsub = (Subscription) oldobj;
530             Subscription newsub = (Subscription) newobj;
531
532             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
533                 logger.info("PROV5013 -  Previous subscriber: "
534                                     + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
535                 oldsub.setSubscriber(newsub.getSubscriber());
536                 oldsub.changeOwnerShip();
537             }
538         }
539
540     }
541
542     /**
543      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
544      *
545      * @return the provisioning data (as a JONObject)
546      */
547     private synchronized JSONObject readProvisioningJson() {
548         String url = URLUtilities.generatePeerProvURL();
549         HttpGet get = new HttpGet(url);
550         try {
551             HttpResponse response = httpclient.execute(get);
552             int code = response.getStatusLine().getStatusCode();
553             if (code != HttpServletResponse.SC_OK) {
554                 logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
555                 return null;
556             }
557             HttpEntity entity = response.getEntity();
558             String ctype = entity.getContentType().getValue().trim();
559             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
560                         && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
561                 logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
562                 return null;
563             }
564             return new JSONObject(new JSONTokener(entity.getContent()));
565         } catch (Exception e) {
566             logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
567             return null;
568         } finally {
569             get.releaseConnection();
570         }
571     }
572
573     /**
574      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
575      * the remote database.
576      *
577      * @return the bitset
578      */
579     public RLEBitSet readRemoteLoglist() {
580         RLEBitSet bs = new RLEBitSet();
581         String url = URLUtilities.generatePeerLogsURL();
582
583         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
584         if ("".equals(url)) {
585             return bs;
586         }
587         //End of fix.
588
589         HttpGet get = new HttpGet(url);
590         try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
591             HttpResponse response = httpclient.execute(get);
592             int code = response.getStatusLine().getStatusCode();
593             if (code != HttpServletResponse.SC_OK) {
594                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
595                 return bs;
596             }
597             HttpEntity entity = response.getEntity();
598             String ctype = entity.getContentType().getValue().trim();
599             if (!TEXT_CT.equals(ctype)) {
600                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
601                 return bs;
602             }
603             InputStream is = entity.getContent();
604             int ch;
605             while ((ch = is.read()) >= 0) {
606                 bos.write(ch);
607             }
608             bs.set(bos.toString());
609             is.close();
610         } catch (Exception e) {
611             logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
612             return bs;
613         } finally {
614             get.releaseConnection();
615         }
616         return bs;
617     }
618
619     /**
620      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
621      * we wish to copy to the local database.
622      *
623      * @param bs the bitset (an RELBitSet) of log records to fetch
624      */
625     public void replicateDataRouterLogs(RLEBitSet bs) {
626         String url = URLUtilities.generatePeerLogsURL();
627         HttpPost post = new HttpPost(url);
628         try {
629             String str = bs.toString();
630             HttpEntity body = new ByteArrayEntity(str.getBytes(), ContentType.create(TEXT_CT));
631             post.setEntity(body);
632             if (logger.isDebugEnabled()) {
633                 logger.debug("Requesting records: " + str);
634             }
635
636             HttpResponse response = httpclient.execute(post);
637             int code = response.getStatusLine().getStatusCode();
638             if (code != HttpServletResponse.SC_OK) {
639                 logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
640                 return;
641             }
642             HttpEntity entity = response.getEntity();
643             String ctype = entity.getContentType().getValue().trim();
644             if (!TEXT_CT.equals(ctype)) {
645                 logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
646                 return;
647             }
648
649             String spoolname = "" + System.currentTimeMillis();
650             Path tmppath = Paths.get(spooldir, spoolname);
651             Path donepath = Paths.get(spooldir, "IN." + spoolname);
652             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
653             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
654             logger.info("Approximately " + bs.cardinality() + " records replicated.");
655         } catch (Exception e) {
656             logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
657         } finally {
658             post.releaseConnection();
659         }
660     }
661 }