datarouter-prov code clean - remove tabs
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / SynchronizerTask.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning;
26
27 import java.io.ByteArrayOutputStream;
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.InputStream;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.nio.file.Files;
34 import java.nio.file.Path;
35 import java.nio.file.Paths;
36 import java.nio.file.StandardCopyOption;
37 import java.security.KeyStore;
38 import java.sql.Connection;
39 import java.sql.SQLException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.HashMap;
44 import java.util.Map;
45 import java.util.Properties;
46 import java.util.Set;
47 import java.util.Timer;
48 import java.util.TimerTask;
49 import java.util.TreeSet;
50
51 import javax.servlet.http.HttpServletResponse;
52
53 import org.apache.http.HttpEntity;
54 import org.apache.http.HttpResponse;
55 import org.apache.http.client.methods.HttpGet;
56 import org.apache.http.client.methods.HttpPost;
57 import org.apache.http.conn.scheme.Scheme;
58 import org.apache.http.conn.ssl.SSLSocketFactory;
59 import org.apache.http.entity.ByteArrayEntity;
60 import org.apache.http.entity.ContentType;
61 import org.apache.http.impl.client.AbstractHttpClient;
62 import org.apache.http.impl.client.DefaultHttpClient;
63 import org.apache.log4j.Logger;
64 import org.json.JSONArray;
65 import org.json.JSONException;
66 import org.json.JSONObject;
67 import org.json.JSONTokener;
68 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
69 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
70 import org.onap.dmaap.datarouter.provisioning.beans.Group;
71 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
72 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
73 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
74 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
75 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
76 import org.onap.dmaap.datarouter.provisioning.utils.DB;
77 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
78 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
79 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
80
81 /**
82  * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
83  * <ol>
84  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
85  * the active (master) POD.</li>
86  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
87  * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
88  * of this POD.</li>
89  * </ol>
90  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
91  * <code>
92  *         Security.setProperty("networkaddress.cache.ttl", "10");
93  * </code>
94  *
95  * @author Robert Eby
96  * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
97  */
98 public class SynchronizerTask extends TimerTask {
99     /** This is a singleton -- there is only one SynchronizerTask object in the server */
100     private static SynchronizerTask synctask;
101
102     /** This POD is unknown -- not on the list of PODs */
103     public static final int UNKNOWN = 0;
104     /** This POD is active -- on the list of PODs, and the DNS CNAME points to us */
105     public static final int ACTIVE = 1;
106     /** This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us */
107     public static final int STANDBY = 2;
108     private static final String[] stnames = { "UNKNOWN", "ACTIVE", "STANDBY" };
109     private static final long ONE_HOUR = 60 * 60 * 1000L;
110
111     private final Logger logger;
112     private final Timer rolex;
113     private final String spooldir;
114     private int state;
115     private boolean doFetch;
116     private long nextsynctime;
117     private AbstractHttpClient httpclient = null;
118
119     /**
120      * Get the singleton SynchronizerTask object.
121      * @return the SynchronizerTask
122      */
123     public static synchronized SynchronizerTask getSynchronizer() {
124         if (synctask == null)
125             synctask = new SynchronizerTask();
126         return synctask;
127     }
128
129     @SuppressWarnings("deprecation")
130     private SynchronizerTask() {
131         logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
132         rolex = new Timer();
133         spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
134         state = UNKNOWN;
135         doFetch = true;        // start off with a fetch
136         nextsynctime = 0;
137
138         logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
139         try {
140             Properties props = (new DB()).getProperties();
141             String type  = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
142             String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
143             String pass  = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY);
144             KeyStore keyStore = KeyStore.getInstance(type);
145             FileInputStream instream = new FileInputStream(new File(store));
146             keyStore.load(instream, pass.toCharArray());
147             instream.close();
148
149             store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
150             pass  = props.getProperty(Main.TRUSTSTORE_PASSWORD_PROPERTY);
151             KeyStore trustStore = null;
152             if (store != null && store.length() > 0) {
153                 trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
154                 instream = new FileInputStream(new File(store));
155                 trustStore.load(instream, pass.toCharArray());
156                 instream.close();
157             }
158
159             // We are connecting with the node name, but the certificate will have the CNAME
160             // So we need to accept a non-matching certificate name
161             String keystorepass  = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
162             AbstractHttpClient hc = new DefaultHttpClient();
163             SSLSocketFactory socketFactory =
164                 (trustStore == null)
165                 ? new SSLSocketFactory(keyStore, keystorepass)
166                 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
167             socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
168             Scheme sch = new Scheme("https", 443, socketFactory);
169             hc.getConnectionManager().getSchemeRegistry().register(sch);
170             httpclient = hc;
171
172             // Run once every 5 seconds to check DNS, etc.
173             long interval = 0;
174             try {
175                 String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
176                 interval = Long.parseLong(s);
177             } catch (NumberFormatException e) {
178                 interval = 5000L;
179             }
180             rolex.scheduleAtFixedRate(this, 0L, interval);
181         } catch (Exception e) {
182             logger.warn("PROV5005: Problem starting the synchronizer: "+e);
183         }
184     }
185
186     /**
187      * What is the state of this POD?
188      * @return one of ACTIVE, STANDBY, UNKNOWN
189      */
190     public int getState() {
191         return state;
192     }
193
194     /**
195      * Is this the active POD?
196      * @return true if we are active (the master), false otherwise
197      */
198     public boolean isActive() {
199         return state == ACTIVE;
200     }
201
202     /**
203      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request,
204      * and that we should re-synchronize with the master.
205      */
206     public void doFetch() {
207         doFetch = true;
208     }
209
210     /**
211      * Runs once a minute in order to <ol>
212      * <li>lookup DNS names,</li>
213      * <li>determine the state of this POD,</li>
214      * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
215      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
216      * </ol>
217      */
218     @Override
219     public void run() {
220         try {
221             state = lookupState();
222             if (state == STANDBY) {
223                 // Only copy provisioning data FROM the active server TO the standby
224                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
225                     logger.debug("Initiating a sync...");
226                     JSONObject jo = readProvisioningJSON();
227                     if (jo != null) {
228                         doFetch = false;
229                         syncFeeds( jo.getJSONArray("feeds"));
230                         syncSubs(  jo.getJSONArray("subscriptions"));
231                         syncGroups(  jo.getJSONArray("groups")); //Rally:US708115 - 1610
232                         syncParams(jo.getJSONObject("parameters"));
233                         // The following will not be present in a version=1.0 provfeed
234                         JSONArray ja = jo.optJSONArray("ingress");
235                         if (ja != null)
236                             syncIngressRoutes(ja);
237                         JSONObject j2 = jo.optJSONObject("egress");
238                         if (j2 != null)
239                             syncEgressRoutes( j2);
240                         ja = jo.optJSONArray("routing");
241                         if (ja != null)
242                             syncNetworkRoutes(ja);
243                     }
244                     logger.info("PROV5013: Sync completed.");
245                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
246                 }
247             } else {
248                 // Don't do fetches on non-standby PODs
249                 doFetch = false;
250             }
251
252             // Fetch DR logs as needed - server to server
253             LogfileLoader lfl = LogfileLoader.getLoader();
254             if (lfl.isIdle()) {
255                 // Only fetch new logs if the loader is waiting for them.
256                 logger.trace("Checking for logs to replicate...");
257                 RLEBitSet local  = lfl.getBitSet();
258                 RLEBitSet remote = readRemoteLoglist();
259                 remote.andNot(local);
260                 if (!remote.isEmpty()) {
261                     logger.debug(" Replicating logs: "+remote);
262                     replicateDRLogs(remote);
263                 }
264             }
265         } catch (Exception e) {
266             logger.warn("PROV0020: Caught exception in SynchronizerTask: "+e);
267             e.printStackTrace();
268         }
269     }
270
271     /**
272      * This method is used to lookup the CNAME that points to the active server.
273      * It returns 0 (UNKNOWN), 1(ACTIVE), or 2 (STANDBY) to indicate the state of this server.
274      * @return the current state
275      */
276     private int lookupState() {
277         int newstate = UNKNOWN;
278         try {
279             InetAddress myaddr = InetAddress.getLocalHost();
280             if (logger.isTraceEnabled())
281                 logger.trace("My address: "+myaddr);
282             String this_pod = myaddr.getHostName();
283             Set<String> pods = new TreeSet<String>(Arrays.asList(BaseServlet.getPods()));
284             if (pods.contains(this_pod)) {
285                 InetAddress pserver = InetAddress.getByName(BaseServlet.active_prov_name);
286                 newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
287                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= next_msg) {
288                     logger.debug("Active POD = "+pserver+", Current state is "+stnames[newstate]);
289                     next_msg = System.currentTimeMillis() + (5 * 60 * 1000L);
290                 }
291             } else {
292                 logger.warn("PROV5003: My name ("+this_pod+") is missing from the list of provisioning servers.");
293             }
294         } catch (UnknownHostException e) {
295             logger.warn("PROV5002: Cannot determine the name of this provisioning server.");
296         }
297
298         if (newstate != state)
299             logger.info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
300         return newstate;
301     }
302     private static long next_msg = 0;    // only display the "Current state" msg every 5 mins.
303     /** Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */
304     private void syncFeeds(JSONArray ja) {
305         Collection<Syncable> coll = new ArrayList<Syncable>();
306         for (int n = 0; n < ja.length(); n++) {
307             try {
308                 Feed f = new Feed(ja.getJSONObject(n));
309                 coll.add(f);
310             } catch (Exception e) {
311                 logger.warn("PROV5004: Invalid object in feed: "+ja.optJSONObject(n));
312             }
313         }
314         if (sync(coll, Feed.getAllFeeds()))
315             BaseServlet.provisioningDataChanged();
316     }
317     /** Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. */
318     private void syncSubs(JSONArray ja) {
319         Collection<Syncable> coll = new ArrayList<Syncable>();
320         for (int n = 0; n < ja.length(); n++) {
321             try {
322                 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
323                 JSONObject j = ja.getJSONObject(n);
324                 j.put("sync", "true");
325                 Subscription s = new Subscription(j);
326                 coll.add(s);
327             } catch (Exception e) {
328                 logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
329             }
330         }
331         if (sync(coll, Subscription.getAllSubscriptions()))
332             BaseServlet.provisioningDataChanged();
333     }
334
335     /**  Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB. */
336     private void syncGroups(JSONArray ja) {
337         Collection<Syncable> coll = new ArrayList<Syncable>();
338         for (int n = 0; n < ja.length(); n++) {
339             try {
340                 Group g = new Group(ja.getJSONObject(n));
341                 coll.add(g);
342             } catch (Exception e) {
343                 logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
344             }
345         }
346         if (sync(coll, Group.getAllgroups()))
347             BaseServlet.provisioningDataChanged();
348     }
349
350
351     /** Synchronize the Parameters in the JSONObject, with the Parameters in the DB. */
352     private void syncParams(JSONObject jo) {
353         Collection<Syncable> coll = new ArrayList<Syncable>();
354         for (String k : jo.keySet()) {
355             String v = "";
356             try {
357                 v = jo.getString(k);
358             } catch (JSONException e) {
359                 try {
360                     v = ""+jo.getInt(k);
361                 } catch (JSONException e1) {
362                     JSONArray ja = jo.getJSONArray(k);
363                     for (int i = 0; i < ja.length(); i++) {
364                         if (i > 0)
365                             v += "|";
366                         v += ja.getString(i);
367                     }
368                 }
369             }
370             coll.add(new Parameters(k, v));
371         }
372         if (sync(coll, Parameters.getParameterCollection())) {
373             BaseServlet.provisioningDataChanged();
374             BaseServlet.provisioningParametersChanged();
375         }
376     }
377     private void syncIngressRoutes(JSONArray ja) {
378         Collection<Syncable> coll = new ArrayList<Syncable>();
379         for (int n = 0; n < ja.length(); n++) {
380             try {
381                 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
382                 coll.add(in);
383             } catch (NumberFormatException e) {
384                 logger.warn("PROV5004: Invalid object in ingress routes: "+ja.optJSONObject(n));
385             }
386         }
387         if (sync(coll, IngressRoute.getAllIngressRoutes()))
388             BaseServlet.provisioningDataChanged();
389     }
390     private void syncEgressRoutes(JSONObject jo) {
391         Collection<Syncable> coll = new ArrayList<Syncable>();
392         for (String key : jo.keySet()) {
393             try {
394                 int sub = Integer.parseInt(key);
395                 String node = jo.getString(key);
396                 EgressRoute er = new EgressRoute(sub, node);
397                 coll.add(er);
398             } catch (NumberFormatException e) {
399                 logger.warn("PROV5004: Invalid subid in egress routes: "+key);
400             } catch (IllegalArgumentException e) {
401                 logger.warn("PROV5004: Invalid node name in egress routes: "+key);
402             }
403         }
404         if (sync(coll, EgressRoute.getAllEgressRoutes()))
405             BaseServlet.provisioningDataChanged();
406     }
407     private void syncNetworkRoutes(JSONArray ja) {
408         Collection<Syncable> coll = new ArrayList<Syncable>();
409         for (int n = 0; n < ja.length(); n++) {
410             try {
411                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
412                 coll.add(nr);
413             } catch (JSONException e) {
414                 logger.warn("PROV5004: Invalid object in network routes: "+ja.optJSONObject(n));
415             }
416         }
417         if (sync(coll, NetworkRoute.getAllNetworkRoutes()))
418             BaseServlet.provisioningDataChanged();
419     }
420     private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
421         boolean changes = false;
422         try {
423             Map<String, Syncable> newmap = getMap(newc);
424             Map<String, Syncable> oldmap = getMap(oldc);
425             Set<String> union = new TreeSet<String>(newmap.keySet());
426             union.addAll(oldmap.keySet());
427             DB db = new DB();
428             @SuppressWarnings("resource")
429             Connection conn = db.getConnection();
430             for (String n : union) {
431                 Syncable newobj = newmap.get(n);
432                 Syncable oldobj = oldmap.get(n);
433                 if (oldobj == null) {
434                     if (logger.isDebugEnabled())
435                         logger.debug("  Inserting record: "+newobj);
436                     newobj.doInsert(conn);
437                     changes = true;
438                 } else if (newobj == null) {
439                     if (logger.isDebugEnabled())
440                         logger.debug("  Deleting record: "+oldobj);
441                     oldobj.doDelete(conn);
442                     changes = true;
443                 } else if (!newobj.equals(oldobj)) {
444                     if (logger.isDebugEnabled())
445                         logger.debug("  Updating record: "+newobj);
446                     newobj.doUpdate(conn);
447
448                     /**Rally US708115
449                      * Change Ownership of FEED - 1610, Syncronised with secondary DB.
450                      * */
451                     checkChnageOwner(newobj, oldobj);
452
453                     changes = true;
454                 }
455             }
456             db.release(conn);
457         } catch (SQLException e) {
458             logger.warn("PROV5009: problem during sync, exception: "+e);
459             e.printStackTrace();
460         }
461         return changes;
462     }
463     private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
464         Map<String, Syncable> map = new HashMap<String, Syncable>();
465         for (Syncable v : c) {
466             map.put(v.getKey(), v);
467         }
468         return map;
469     }
470
471
472     /**Change owner of FEED/SUBSCRIPTION*/
473     /**Rally US708115
474      * Change Ownership of FEED - 1610
475      *
476      * */
477     private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
478         if(newobj instanceof Feed) {
479             Feed oldfeed = (Feed) oldobj;
480             Feed newfeed = (Feed) newobj;
481
482             if(!oldfeed.getPublisher().equals(newfeed.getPublisher())){
483                 logger.info("PROV5013 -  Previous publisher: "+oldfeed.getPublisher() +": New publisher-"+newfeed.getPublisher());
484                 oldfeed.setPublisher(newfeed.getPublisher());
485                 oldfeed.changeOwnerShip();
486             }
487         }
488         else if(newobj instanceof Subscription) {
489             Subscription oldsub = (Subscription) oldobj;
490             Subscription newsub = (Subscription) newobj;
491
492             if(!oldsub.getSubscriber().equals(newsub.getSubscriber())){
493                 logger.info("PROV5013 -  Previous subscriber: "+oldsub.getSubscriber() +": New subscriber-"+newsub.getSubscriber());
494                 oldsub.setSubscriber(newsub.getSubscriber());
495                 oldsub.changeOwnerShip();
496             }
497         }
498
499     }
500
501     /**
502      * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
503      * @return the provisioning data (as a JONObject)
504      */
505     private synchronized JSONObject readProvisioningJSON() {
506         String url  = URLUtilities.generatePeerProvURL();
507         HttpGet get = new HttpGet(url);
508         try {
509             HttpResponse response = httpclient.execute(get);
510             int code = response.getStatusLine().getStatusCode();
511             if (code != HttpServletResponse.SC_OK) {
512                 logger.warn("PROV5010: readProvisioningJSON failed, bad error code: "+code);
513                 return null;
514             }
515             HttpEntity entity = response.getEntity();
516             String ctype = entity.getContentType().getValue().trim();
517             if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
518                 logger.warn("PROV5011: readProvisioningJSON failed, bad content type: "+ctype);
519                 return null;
520             }
521             return new JSONObject(new JSONTokener(entity.getContent()));
522         } catch (Exception e) {
523             logger.warn("PROV5012: readProvisioningJSON failed, exception: "+e);
524             return null;
525         } finally {
526             get.releaseConnection();
527         }
528     }
529     /**
530      * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the
531      * log records available in the remote database.
532      * @return the bitset
533      */
534     private RLEBitSet readRemoteLoglist() {
535         RLEBitSet bs = new RLEBitSet();
536         String url  = URLUtilities.generatePeerLogsURL();
537
538         //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
539         if(url.equals("")) {
540             return bs;
541         }
542         //End of fix.
543
544         HttpGet get = new HttpGet(url);
545         try {
546             HttpResponse response = httpclient.execute(get);
547             int code = response.getStatusLine().getStatusCode();
548             if (code != HttpServletResponse.SC_OK) {
549                 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: "+code);
550                 return bs;
551             }
552             HttpEntity entity = response.getEntity();
553             String ctype = entity.getContentType().getValue().trim();
554             if (!ctype.equals("text/plain")) {
555                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: "+ctype);
556                 return bs;
557             }
558             InputStream is = entity.getContent();
559             ByteArrayOutputStream bos = new ByteArrayOutputStream();
560             int ch = 0;
561             while ((ch = is.read()) >= 0)
562                 bos.write(ch);
563             bs.set(bos.toString());
564             is.close();
565         } catch (Exception e) {
566             logger.warn("PROV5012: readRemoteLoglist failed, exception: "+e);
567             return bs;
568         } finally {
569             get.releaseConnection();
570         }
571         return bs;
572     }
573     /**
574      * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available
575      * in the remote database that we wish to copy to the local database.
576      * @param bs the bitset (an RELBitSet) of log records to fetch
577      */
578     private void replicateDRLogs(RLEBitSet bs) {
579         String url  = URLUtilities.generatePeerLogsURL();
580         HttpPost post = new HttpPost(url);
581         try {
582             String t = bs.toString();
583             HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
584             post.setEntity(body);
585             if (logger.isDebugEnabled())
586                 logger.debug("Requesting records: "+t);
587
588             HttpResponse response = httpclient.execute(post);
589             int code = response.getStatusLine().getStatusCode();
590             if (code != HttpServletResponse.SC_OK) {
591                 logger.warn("PROV5010: replicateDRLogs failed, bad error code: "+code);
592                 return;
593             }
594             HttpEntity entity = response.getEntity();
595             String ctype = entity.getContentType().getValue().trim();
596             if (!ctype.equals("text/plain")) {
597                 logger.warn("PROV5011: replicateDRLogs failed, bad content type: "+ctype);
598                 return;
599             }
600
601             String spoolname = "" + System.currentTimeMillis();
602             Path tmppath = Paths.get(spooldir, spoolname);
603             Path donepath = Paths.get(spooldir, "IN."+spoolname);
604             Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
605             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
606             logger.info("Approximately "+bs.cardinality()+" records replicated.");
607         } catch (Exception e) {
608             logger.warn("PROV5012: replicateDRLogs failed, exception: "+e);
609         } finally {
610             post.releaseConnection();
611         }
612     }
613 }