1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package org.onap.dmaap.datarouter.provisioning;
\r
27 import java.io.ByteArrayOutputStream;
\r
28 import java.io.File;
\r
29 import java.io.FileInputStream;
\r
30 import java.io.InputStream;
\r
31 import java.net.InetAddress;
\r
32 import java.net.UnknownHostException;
\r
33 import java.nio.file.Files;
\r
34 import java.nio.file.Path;
\r
35 import java.nio.file.Paths;
\r
36 import java.nio.file.StandardCopyOption;
\r
37 import java.security.KeyStore;
\r
38 import java.sql.Connection;
\r
39 import java.sql.SQLException;
\r
40 import java.util.ArrayList;
\r
41 import java.util.Arrays;
\r
42 import java.util.Collection;
\r
43 import java.util.HashMap;
\r
44 import java.util.Map;
\r
45 import java.util.Properties;
\r
46 import java.util.Set;
\r
47 import java.util.Timer;
\r
48 import java.util.TimerTask;
\r
49 import java.util.TreeSet;
\r
51 import javax.servlet.http.HttpServletResponse;
\r
53 import org.apache.http.HttpEntity;
\r
54 import org.apache.http.HttpResponse;
\r
55 import org.apache.http.client.methods.HttpGet;
\r
56 import org.apache.http.client.methods.HttpPost;
\r
57 import org.apache.http.conn.scheme.Scheme;
\r
58 import org.apache.http.conn.ssl.SSLSocketFactory;
\r
59 import org.apache.http.entity.ByteArrayEntity;
\r
60 import org.apache.http.entity.ContentType;
\r
61 import org.apache.http.impl.client.AbstractHttpClient;
\r
62 import org.apache.http.impl.client.DefaultHttpClient;
\r
63 import org.apache.log4j.Logger;
\r
64 import org.json.JSONArray;
\r
65 import org.json.JSONException;
\r
66 import org.json.JSONObject;
\r
67 import org.json.JSONTokener;
\r
68 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
\r
69 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
\r
70 import org.onap.dmaap.datarouter.provisioning.beans.Group;
\r
71 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
\r
72 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
\r
73 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
\r
74 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
\r
75 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
\r
76 import org.onap.dmaap.datarouter.provisioning.utils.DB;
\r
77 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
\r
78 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
\r
79 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
\r
82 * This class handles synchronization between provisioning servers (PODs). It has three primary functions:
\r
84 * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
\r
85 * the active (master) POD.</li>
\r
86 * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MySQL in sync.</li>
\r
87 * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
\r
90 * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
\r
92 * Security.setProperty("networkaddress.cache.ttl", "10");
\r
95 * @author Robert Eby
\r
96 * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
\r
98 public class SynchronizerTask extends TimerTask {
\r
99 /** This is a singleton -- there is only one SynchronizerTask object in the server */
\r
100 private static SynchronizerTask synctask;
\r
102 /** This POD is unknown -- not on the list of PODs */
\r
103 public static final int UNKNOWN = 0;
\r
104 /** This POD is active -- on the list of PODs, and the DNS CNAME points to us */
\r
105 public static final int ACTIVE = 1;
\r
106 /** This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us */
\r
107 public static final int STANDBY = 2;
\r
108 private static final String[] stnames = { "UNKNOWN", "ACTIVE", "STANDBY" };
\r
109 private static final long ONE_HOUR = 60 * 60 * 1000L;
\r
111 private final Logger logger;
\r
112 private final Timer rolex;
\r
113 private final String spooldir;
\r
115 private boolean doFetch;
\r
116 private long nextsynctime;
\r
117 private AbstractHttpClient httpclient = null;
\r
120 * Get the singleton SynchronizerTask object.
\r
121 * @return the SynchronizerTask
\r
123 public static synchronized SynchronizerTask getSynchronizer() {
\r
124 if (synctask == null)
\r
125 synctask = new SynchronizerTask();
\r
129 @SuppressWarnings("deprecation")
\r
130 private SynchronizerTask() {
\r
131 logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
\r
132 rolex = new Timer();
\r
133 spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
\r
135 doFetch = true; // start off with a fetch
\r
138 logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
\r
140 Properties props = (new DB()).getProperties();
\r
141 String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
\r
142 String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
\r
143 String pass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY);
\r
144 KeyStore keyStore = KeyStore.getInstance(type);
\r
145 FileInputStream instream = new FileInputStream(new File(store));
\r
146 keyStore.load(instream, pass.toCharArray());
\r
149 store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
\r
150 pass = props.getProperty(Main.TRUSTSTORE_PASSWORD_PROPERTY);
\r
151 KeyStore trustStore = null;
\r
152 if (store != null && store.length() > 0) {
\r
153 trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
\r
154 instream = new FileInputStream(new File(store));
\r
155 trustStore.load(instream, pass.toCharArray());
\r
159 // We are connecting with the node name, but the certificate will have the CNAME
\r
160 // So we need to accept a non-matching certificate name
\r
161 String keystorepass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
\r
162 AbstractHttpClient hc = new DefaultHttpClient();
\r
163 SSLSocketFactory socketFactory =
\r
164 (trustStore == null)
\r
165 ? new SSLSocketFactory(keyStore, keystorepass)
\r
166 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
\r
167 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
\r
168 Scheme sch = new Scheme("https", 443, socketFactory);
\r
169 hc.getConnectionManager().getSchemeRegistry().register(sch);
\r
172 // Run once every 5 seconds to check DNS, etc.
\r
175 String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
\r
176 interval = Long.parseLong(s);
\r
177 } catch (NumberFormatException e) {
\r
180 rolex.scheduleAtFixedRate(this, 0L, interval);
\r
181 } catch (Exception e) {
\r
182 logger.warn("PROV5005: Problem starting the synchronizer: "+e);
\r
187 * What is the state of this POD?
\r
188 * @return one of ACTIVE, STANDBY, UNKNOWN
\r
190 public int getState() {
\r
195 * Is this the active POD?
\r
196 * @return true if we are active (the master), false otherwise
\r
198 public boolean isActive() {
\r
199 return state == ACTIVE;
\r
203 * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request,
\r
204 * and that we should re-synchronize with the master.
\r
206 public void doFetch() {
\r
211 * Runs once a minute in order to <ol>
\r
212 * <li>lookup DNS names,</li>
\r
213 * <li>determine the state of this POD,</li>
\r
214 * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
\r
215 * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
\r
219 public void run() {
\r
221 state = lookupState();
\r
222 if (state == STANDBY) {
\r
223 // Only copy provisioning data FROM the active server TO the standby
\r
224 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
\r
225 logger.debug("Initiating a sync...");
\r
226 JSONObject jo = readProvisioningJSON();
\r
229 syncFeeds( jo.getJSONArray("feeds"));
\r
230 syncSubs( jo.getJSONArray("subscriptions"));
\r
231 syncGroups( jo.getJSONArray("groups")); //Rally:US708115 - 1610
\r
232 syncParams(jo.getJSONObject("parameters"));
\r
233 // The following will not be present in a version=1.0 provfeed
\r
234 JSONArray ja = jo.optJSONArray("ingress");
\r
236 syncIngressRoutes(ja);
\r
237 JSONObject j2 = jo.optJSONObject("egress");
\r
239 syncEgressRoutes( j2);
\r
240 ja = jo.optJSONArray("routing");
\r
242 syncNetworkRoutes(ja);
\r
244 logger.info("PROV5013: Sync completed.");
\r
245 nextsynctime = System.currentTimeMillis() + ONE_HOUR;
\r
248 // Don't do fetches on non-standby PODs
\r
252 // Fetch DR logs as needed - server to server
\r
253 LogfileLoader lfl = LogfileLoader.getLoader();
\r
254 if (lfl.isIdle()) {
\r
255 // Only fetch new logs if the loader is waiting for them.
\r
256 logger.trace("Checking for logs to replicate...");
\r
257 RLEBitSet local = lfl.getBitSet();
\r
258 RLEBitSet remote = readRemoteLoglist();
\r
259 remote.andNot(local);
\r
260 if (!remote.isEmpty()) {
\r
261 logger.debug(" Replicating logs: "+remote);
\r
262 replicateDRLogs(remote);
\r
265 } catch (Exception e) {
\r
266 logger.warn("PROV0020: Caught exception in SynchronizerTask: "+e);
\r
267 e.printStackTrace();
\r
272 * This method is used to lookup the CNAME that points to the active server.
\r
273 * It returns 0 (UNKNOWN), 1(ACTIVE), or 2 (STANDBY) to indicate the state of this server.
\r
274 * @return the current state
\r
276 private int lookupState() {
\r
277 int newstate = UNKNOWN;
\r
279 InetAddress myaddr = InetAddress.getLocalHost();
\r
280 if (logger.isTraceEnabled())
\r
281 logger.trace("My address: "+myaddr);
\r
282 String this_pod = myaddr.getHostName();
\r
283 Set<String> pods = new TreeSet<String>(Arrays.asList(BaseServlet.getPods()));
\r
284 if (pods.contains(this_pod)) {
\r
285 InetAddress pserver = InetAddress.getByName(BaseServlet.active_prov_name);
\r
286 newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
\r
287 if (logger.isDebugEnabled() && System.currentTimeMillis() >= next_msg) {
\r
288 logger.debug("Active POD = "+pserver+", Current state is "+stnames[newstate]);
\r
289 next_msg = System.currentTimeMillis() + (5 * 60 * 1000L);
\r
292 logger.warn("PROV5003: My name ("+this_pod+") is missing from the list of provisioning servers.");
\r
294 } catch (UnknownHostException e) {
\r
295 logger.warn("PROV5002: Cannot determine the name of this provisioning server.");
\r
298 if (newstate != state)
\r
299 logger.info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
\r
302 private static long next_msg = 0; // only display the "Current state" msg every 5 mins.
\r
303 /** Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */
\r
304 private void syncFeeds(JSONArray ja) {
\r
305 Collection<Syncable> coll = new ArrayList<Syncable>();
\r
306 for (int n = 0; n < ja.length(); n++) {
\r
308 Feed f = new Feed(ja.getJSONObject(n));
\r
310 } catch (Exception e) {
\r
311 logger.warn("PROV5004: Invalid object in feed: "+ja.optJSONObject(n));
\r
314 if (sync(coll, Feed.getAllFeeds()))
\r
315 BaseServlet.provisioningDataChanged();
\r
317 /** Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. */
\r
318 private void syncSubs(JSONArray ja) {
\r
319 Collection<Syncable> coll = new ArrayList<Syncable>();
\r
320 for (int n = 0; n < ja.length(); n++) {
\r
322 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
\r
323 JSONObject j = ja.getJSONObject(n);
\r
324 j.put("sync", "true");
\r
325 Subscription s = new Subscription(j);
\r
327 } catch (Exception e) {
\r
328 logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
\r
331 if (sync(coll, Subscription.getAllSubscriptions()))
\r
332 BaseServlet.provisioningDataChanged();
\r
335 /** Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB. */
\r
336 private void syncGroups(JSONArray ja) {
\r
337 Collection<Syncable> coll = new ArrayList<Syncable>();
\r
338 for (int n = 0; n < ja.length(); n++) {
\r
340 Group g = new Group(ja.getJSONObject(n));
\r
342 } catch (Exception e) {
\r
343 logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
\r
346 if (sync(coll, Group.getAllgroups()))
\r
347 BaseServlet.provisioningDataChanged();
\r
351 /** Synchronize the Parameters in the JSONObject, with the Parameters in the DB. */
\r
352 private void syncParams(JSONObject jo) {
\r
353 Collection<Syncable> coll = new ArrayList<Syncable>();
\r
354 for (String k : jo.keySet()) {
\r
357 v = jo.getString(k);
\r
358 } catch (JSONException e) {
\r
360 v = ""+jo.getInt(k);
\r
361 } catch (JSONException e1) {
\r
362 JSONArray ja = jo.getJSONArray(k);
\r
363 for (int i = 0; i < ja.length(); i++) {
\r
366 v += ja.getString(i);
\r
370 coll.add(new Parameters(k, v));
\r
372 if (sync(coll, Parameters.getParameterCollection())) {
\r
373 BaseServlet.provisioningDataChanged();
\r
374 BaseServlet.provisioningParametersChanged();
\r
377 private void syncIngressRoutes(JSONArray ja) {
\r
378 Collection<Syncable> coll = new ArrayList<Syncable>();
\r
379 for (int n = 0; n < ja.length(); n++) {
\r
381 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
\r
383 } catch (NumberFormatException e) {
\r
384 logger.warn("PROV5004: Invalid object in ingress routes: "+ja.optJSONObject(n));
\r
387 if (sync(coll, IngressRoute.getAllIngressRoutes()))
\r
388 BaseServlet.provisioningDataChanged();
\r
390 private void syncEgressRoutes(JSONObject jo) {
\r
391 Collection<Syncable> coll = new ArrayList<Syncable>();
\r
392 for (String key : jo.keySet()) {
\r
394 int sub = Integer.parseInt(key);
\r
395 String node = jo.getString(key);
\r
396 EgressRoute er = new EgressRoute(sub, node);
\r
398 } catch (NumberFormatException e) {
\r
399 logger.warn("PROV5004: Invalid subid in egress routes: "+key);
\r
400 } catch (IllegalArgumentException e) {
\r
401 logger.warn("PROV5004: Invalid node name in egress routes: "+key);
\r
404 if (sync(coll, EgressRoute.getAllEgressRoutes()))
\r
405 BaseServlet.provisioningDataChanged();
\r
407 private void syncNetworkRoutes(JSONArray ja) {
\r
408 Collection<Syncable> coll = new ArrayList<Syncable>();
\r
409 for (int n = 0; n < ja.length(); n++) {
\r
411 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
\r
413 } catch (JSONException e) {
\r
414 logger.warn("PROV5004: Invalid object in network routes: "+ja.optJSONObject(n));
\r
417 if (sync(coll, NetworkRoute.getAllNetworkRoutes()))
\r
418 BaseServlet.provisioningDataChanged();
\r
420 private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
\r
421 boolean changes = false;
\r
423 Map<String, Syncable> newmap = getMap(newc);
\r
424 Map<String, Syncable> oldmap = getMap(oldc);
\r
425 Set<String> union = new TreeSet<String>(newmap.keySet());
\r
426 union.addAll(oldmap.keySet());
\r
428 @SuppressWarnings("resource")
\r
429 Connection conn = db.getConnection();
\r
430 for (String n : union) {
\r
431 Syncable newobj = newmap.get(n);
\r
432 Syncable oldobj = oldmap.get(n);
\r
433 if (oldobj == null) {
\r
434 if (logger.isDebugEnabled())
\r
435 logger.debug(" Inserting record: "+newobj);
\r
436 newobj.doInsert(conn);
\r
438 } else if (newobj == null) {
\r
439 if (logger.isDebugEnabled())
\r
440 logger.debug(" Deleting record: "+oldobj);
\r
441 oldobj.doDelete(conn);
\r
443 } else if (!newobj.equals(oldobj)) {
\r
444 if (logger.isDebugEnabled())
\r
445 logger.debug(" Updating record: "+newobj);
\r
446 newobj.doUpdate(conn);
\r
449 * Change Ownership of FEED - 1610, Syncronised with secondary DB.
\r
451 checkChnageOwner(newobj, oldobj);
\r
457 } catch (SQLException e) {
\r
458 logger.warn("PROV5009: problem during sync, exception: "+e);
\r
459 e.printStackTrace();
\r
463 private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
\r
464 Map<String, Syncable> map = new HashMap<String, Syncable>();
\r
465 for (Syncable v : c) {
\r
466 map.put(v.getKey(), v);
\r
472 /**Change owner of FEED/SUBSCRIPTION*/
\r
474 * Change Ownership of FEED - 1610
\r
477 private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
\r
478 if(newobj instanceof Feed) {
\r
479 Feed oldfeed = (Feed) oldobj;
\r
480 Feed newfeed = (Feed) newobj;
\r
482 if(!oldfeed.getPublisher().equals(newfeed.getPublisher())){
\r
483 logger.info("PROV5013 - Previous publisher: "+oldfeed.getPublisher() +": New publisher-"+newfeed.getPublisher());
\r
484 oldfeed.setPublisher(newfeed.getPublisher());
\r
485 oldfeed.changeOwnerShip();
\r
488 else if(newobj instanceof Subscription) {
\r
489 Subscription oldsub = (Subscription) oldobj;
\r
490 Subscription newsub = (Subscription) newobj;
\r
492 if(!oldsub.getSubscriber().equals(newsub.getSubscriber())){
\r
493 logger.info("PROV5013 - Previous subscriber: "+oldsub.getSubscriber() +": New subscriber-"+newsub.getSubscriber());
\r
494 oldsub.setSubscriber(newsub.getSubscriber());
\r
495 oldsub.changeOwnerShip();
\r
502 * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
\r
503 * @return the provisioning data (as a JONObject)
\r
505 private synchronized JSONObject readProvisioningJSON() {
\r
506 String url = URLUtilities.generatePeerProvURL();
\r
507 HttpGet get = new HttpGet(url);
\r
509 HttpResponse response = httpclient.execute(get);
\r
510 int code = response.getStatusLine().getStatusCode();
\r
511 if (code != HttpServletResponse.SC_OK) {
\r
512 logger.warn("PROV5010: readProvisioningJSON failed, bad error code: "+code);
\r
515 HttpEntity entity = response.getEntity();
\r
516 String ctype = entity.getContentType().getValue().trim();
\r
517 if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
\r
518 logger.warn("PROV5011: readProvisioningJSON failed, bad content type: "+ctype);
\r
521 return new JSONObject(new JSONTokener(entity.getContent()));
\r
522 } catch (Exception e) {
\r
523 logger.warn("PROV5012: readProvisioningJSON failed, exception: "+e);
\r
526 get.releaseConnection();
\r
530 * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the
\r
531 * log records available in the remote database.
\r
532 * @return the bitset
\r
534 private RLEBitSet readRemoteLoglist() {
\r
535 RLEBitSet bs = new RLEBitSet();
\r
536 String url = URLUtilities.generatePeerLogsURL();
\r
538 //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
\r
539 if(url.equals("")) {
\r
544 HttpGet get = new HttpGet(url);
\r
546 HttpResponse response = httpclient.execute(get);
\r
547 int code = response.getStatusLine().getStatusCode();
\r
548 if (code != HttpServletResponse.SC_OK) {
\r
549 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: "+code);
\r
552 HttpEntity entity = response.getEntity();
\r
553 String ctype = entity.getContentType().getValue().trim();
\r
554 if (!ctype.equals("text/plain")) {
\r
555 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: "+ctype);
\r
558 InputStream is = entity.getContent();
\r
559 ByteArrayOutputStream bos = new ByteArrayOutputStream();
\r
561 while ((ch = is.read()) >= 0)
\r
563 bs.set(bos.toString());
\r
565 } catch (Exception e) {
\r
566 logger.warn("PROV5012: readRemoteLoglist failed, exception: "+e);
\r
569 get.releaseConnection();
\r
574 * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available
\r
575 * in the remote database that we wish to copy to the local database.
\r
576 * @param bs the bitset (an RELBitSet) of log records to fetch
\r
578 private void replicateDRLogs(RLEBitSet bs) {
\r
579 String url = URLUtilities.generatePeerLogsURL();
\r
580 HttpPost post = new HttpPost(url);
\r
582 String t = bs.toString();
\r
583 HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
\r
584 post.setEntity(body);
\r
585 if (logger.isDebugEnabled())
\r
586 logger.debug("Requesting records: "+t);
\r
588 HttpResponse response = httpclient.execute(post);
\r
589 int code = response.getStatusLine().getStatusCode();
\r
590 if (code != HttpServletResponse.SC_OK) {
\r
591 logger.warn("PROV5010: replicateDRLogs failed, bad error code: "+code);
\r
594 HttpEntity entity = response.getEntity();
\r
595 String ctype = entity.getContentType().getValue().trim();
\r
596 if (!ctype.equals("text/plain")) {
\r
597 logger.warn("PROV5011: replicateDRLogs failed, bad content type: "+ctype);
\r
601 String spoolname = "" + System.currentTimeMillis();
\r
602 Path tmppath = Paths.get(spooldir, spoolname);
\r
603 Path donepath = Paths.get(spooldir, "IN."+spoolname);
\r
604 Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
\r
605 Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
\r
606 logger.info("Approximately "+bs.cardinality()+" records replicated.");
\r
607 } catch (Exception e) {
\r
608 logger.warn("PROV5012: replicateDRLogs failed, exception: "+e);
\r
610 post.releaseConnection();
\r