X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fprovisioning%2Futils%2FSynchronizerTask.java;fp=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fprovisioning%2Futils%2FSynchronizerTask.java;h=5eeb45a22b82784c9a23902be2b1c8779e567194;hb=68a9ca240970fceaf12bbe91b7bad8e1d98ecd93;hp=0000000000000000000000000000000000000000;hpb=f3b3e701a48d529ee6dc88e3a867448498e23d36;p=dmaap%2Fdatarouter.git diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/SynchronizerTask.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/SynchronizerTask.java new file mode 100644 index 00000000..5eeb45a2 --- /dev/null +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/SynchronizerTask.java @@ -0,0 +1,683 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.provisioning.utils; + +import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.security.KeyStore; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeSet; +import javax.servlet.http.HttpServletResponse; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.AbstractHttpClient; +import org.apache.http.impl.client.DefaultHttpClient; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.onap.dmaap.datarouter.provisioning.BaseServlet; +import org.onap.dmaap.datarouter.provisioning.ProvRunner; +import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute; +import org.onap.dmaap.datarouter.provisioning.beans.Feed; +import org.onap.dmaap.datarouter.provisioning.beans.Group; +import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute; +import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute; +import org.onap.dmaap.datarouter.provisioning.beans.Parameters; +import org.onap.dmaap.datarouter.provisioning.beans.Subscription; +import org.onap.dmaap.datarouter.provisioning.beans.Syncable; + +/** + * This class handles synchronization between provisioning servers (PODs). It has three primary functions: + *
    + *
  1. Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to + * the active (master) POD.
  2. + *
  3. On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.
  4. + *
  5. Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD) + * of this POD.
  6. + *
+ *

For this to work correctly, the following code needs to be placed at the beginning of main().

+ * + * Security.setProperty("networkaddress.cache.ttl", "10"); + * + * + * @author Robert Eby + * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $ + */ + +public class SynchronizerTask extends TimerTask { + + /** + * This is a singleton -- there is only one SynchronizerTask object in the server. + */ + private static SynchronizerTask synctask; + + /** + * This POD is unknown -- not on the list of PODs. + */ + public static final int UNKNOWN_POD = 0; + /** + * This POD is active -- on the list of PODs, and the DNS CNAME points to us. + */ + public static final int ACTIVE_POD = 1; + /** + * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us. + */ + public static final int STANDBY_POD = 2; + + private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"}; + private static final long ONE_HOUR = 60 * 60 * 1000L; + + private long nextMsg = 0; // only display the "Current podState" msg every 5 mins. + + private final EELFLogger logger; + private final Timer rolex; + private final String spooldir; + private int podState; + private boolean doFetch; + private long nextsynctime; + private AbstractHttpClient httpclient = null; + + @SuppressWarnings("deprecation") + private SynchronizerTask() { + logger = EELFManager.getInstance().getLogger("InternalLog"); + rolex = new Timer(); + spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir"); + podState = UNKNOWN_POD; + doFetch = true; // start off with a fetch + nextsynctime = 0; + + logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD"); + try { + // Set up keystore + String type = AafPropsUtils.KEYSTORE_TYPE_PROPERTY; + String store = ProvRunner.getAafPropsUtils().getKeystorePathProperty(); + String pass = ProvRunner.getAafPropsUtils().getKeystorePassProperty(); + KeyStore keyStore = KeyStore.getInstance(type); + try (FileInputStream instream = new FileInputStream(new File(store))) { + keyStore.load(instream, pass.toCharArray()); + + } + // Set up truststore + store = ProvRunner.getAafPropsUtils().getTruststorePathProperty(); + pass = ProvRunner.getAafPropsUtils().getTruststorePassProperty(); + KeyStore trustStore = null; + if (store != null && store.length() > 0) { + trustStore = KeyStore.getInstance(AafPropsUtils.TRUESTSTORE_TYPE_PROPERTY); + try (FileInputStream instream = new FileInputStream(new File(store))) { + trustStore.load(instream, pass.toCharArray()); + + } + } + + // We are connecting with the node name, but the certificate will have the CNAME + // So we need to accept a non-matching certificate name + String keystorepass = ProvRunner.getAafPropsUtils().getKeystorePassProperty(); + try (AbstractHttpClient hc = new DefaultHttpClient()) { + SSLSocketFactory socketFactory = + (trustStore == null) + ? new SSLSocketFactory(keyStore, keystorepass) + : new SSLSocketFactory(keyStore, keystorepass, trustStore); + socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + Scheme sch = new Scheme("https", 443, socketFactory); + hc.getConnectionManager().getSchemeRegistry().register(sch); + httpclient = hc; + } + setSynchTimer(ProvRunner.getProvProperties().getProperty( + "org.onap.dmaap.datarouter.provserver.sync_interval", "5000")); + } catch (Exception e) { + logger.warn("PROV5005: Problem starting the synchronizer: " + e); + } + } + + private void setSynchTimer(String strInterval) { + // Run once every 5 seconds to check DNS, etc. + long interval; + try { + interval = Long.parseLong(strInterval); + } catch (NumberFormatException e) { + interval = 5000L; + } + rolex.scheduleAtFixedRate(this, 0L, interval); + } + + /** + * Get the singleton SynchronizerTask object. + * + * @return the SynchronizerTask + */ + public static synchronized SynchronizerTask getSynchronizer() { + if (synctask == null) { + synctask = new SynchronizerTask(); + } + return synctask; + } + + /** + * What is the podState of this POD?. + * + * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD + */ + public int getPodState() { + return podState; + } + + /** + * Is this the active POD?. + * + * @return true if we are active (the master), false otherwise + */ + public boolean isActive() { + return podState == ACTIVE_POD; + } + + /** + * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we + * should re-synchronize with the master. + */ + public void doFetch() { + doFetch = true; + } + + /** + * Runs once a minute in order to
    + *
  1. lookup DNS names,
  2. + *
  3. determine the podState of this POD,
  4. + *
  5. if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.
  6. + *
  7. if this is a standby POD, check if there are any new log records to be replicated.
  8. + *
. + */ + @Override + public void run() { + try { + podState = lookupState(); + if (podState == STANDBY_POD) { + // Only copy provisioning data FROM the active server TO the standby + if (doFetch || (System.currentTimeMillis() >= nextsynctime)) { + syncProvisioningData(); + logger.info("PROV5013: Sync completed."); + nextsynctime = System.currentTimeMillis() + ONE_HOUR; + } + } else { + // Don't do fetches on non-standby PODs + doFetch = false; + } + + // Fetch DR logs as needed - server to server + LogfileLoader lfl = LogfileLoader.getLoader(); + if (lfl.isIdle()) { + // Only fetch new logs if the loader is waiting for them. + logger.trace("Checking for logs to replicate..."); + RLEBitSet local = lfl.getBitSet(); + RLEBitSet remote = readRemoteLoglist(); + remote.andNot(local); + if (!remote.isEmpty()) { + logger.debug(" Replicating logs: " + remote); + replicateDataRouterLogs(remote); + } + } + } catch (Exception e) { + logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e); + } + } + + private void syncProvisioningData() { + logger.debug("Initiating a sync..."); + JSONObject jo = readProvisioningJson(); + if (jo != null) { + doFetch = false; + syncFeeds(jo.getJSONArray("feeds")); + syncSubs(jo.getJSONArray("subscriptions")); + syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610 + syncParams(jo.getJSONObject("parameters")); + // The following will not be present in a version=1.0 provfeed + JSONArray ja = jo.optJSONArray("ingress"); + if (ja != null) { + syncIngressRoutes(ja); + } + JSONObject j2 = jo.optJSONObject("egress"); + if (j2 != null) { + syncEgressRoutes(j2); + } + ja = jo.optJSONArray("routing"); + if (ja != null) { + syncNetworkRoutes(ja); + } + } + } + + /** + * This method is used to lookup the CNAME that points to the active server. + * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server. + * + * @return the current podState + */ + public int lookupState() { + int newPodState = UNKNOWN_POD; + try { + InetAddress myaddr = InetAddress.getLocalHost(); + if (logger.isTraceEnabled()) { + logger.trace("My address: " + myaddr); + } + String thisPod = myaddr.getHostName(); + Set pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods())); + if (pods.contains(thisPod)) { + InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName()); + newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD; + if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) { + logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]); + nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L); + } + } else { + logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers."); + } + } catch (UnknownHostException e) { + logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e); + } + + if (newPodState != podState) { + logger.info(String.format("PROV5001: Server podState changed from %s to %s", + stnames[podState], stnames[newPodState])); + } + return newPodState; + } + + /** + * Synchronize the Feeds in the JSONArray, with the Feeds in the DB. + */ + private void syncFeeds(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + Feed feed = new Feed(ja.getJSONObject(n)); + coll.add(feed); + } catch (Exception e) { + logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e); + } + } + if (sync(coll, Feed.getAllFeeds())) { + BaseServlet.provisioningDataChanged(); + } + } + + /** + * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. + */ + private void syncSubs(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047. + JSONObject jsonObject = ja.getJSONObject(n); + jsonObject.put("sync", "true"); + Subscription sub = new Subscription(jsonObject); + coll.add(sub); + } catch (Exception e) { + logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e); + } + } + if (sync(coll, Subscription.getAllSubscriptions())) { + BaseServlet.provisioningDataChanged(); + } + } + + /** + * Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB. + */ + private void syncGroups(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + Group group = new Group(ja.getJSONObject(n)); + coll.add(group); + } catch (Exception e) { + logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e); + } + } + if (sync(coll, Group.getAllgroups())) { + BaseServlet.provisioningDataChanged(); + } + } + + + /** + * Synchronize the Parameters in the JSONObject, with the Parameters in the DB. + */ + private void syncParams(JSONObject jo) { + Collection coll = new ArrayList<>(); + for (String k : jo.keySet()) { + String val = ""; + try { + val = jo.getString(k); + } catch (JSONException e) { + logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e); + try { + val = "" + jo.getInt(k); + } catch (JSONException e1) { + logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1); + JSONArray ja = jo.getJSONArray(k); + for (int i = 0; i < ja.length(); i++) { + if (i > 0) { + val += "|"; + } + val += ja.getString(i); + } + } + } + coll.add(new Parameters(k, val)); + } + if (sync(coll, Parameters.getParameterCollection())) { + BaseServlet.provisioningDataChanged(); + BaseServlet.provisioningParametersChanged(); + } + } + + private void syncIngressRoutes(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + IngressRoute in = new IngressRoute(ja.getJSONObject(n)); + coll.add(in); + } catch (NumberFormatException e) { + logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n)); + } + } + if (sync(coll, IngressRoute.getAllIngressRoutes())) { + BaseServlet.provisioningDataChanged(); + } + } + + private void syncEgressRoutes(JSONObject jo) { + Collection coll = new ArrayList<>(); + for (String key : jo.keySet()) { + try { + int sub = Integer.parseInt(key); + String node = jo.getString(key); + EgressRoute er = new EgressRoute(sub, node); + coll.add(er); + } catch (NumberFormatException e) { + logger.warn("PROV5004: Invalid subid in egress routes: " + key, e); + } catch (IllegalArgumentException e) { + logger.warn("PROV5004: Invalid node name in egress routes: " + key, e); + } + } + if (sync(coll, EgressRoute.getAllEgressRoutes())) { + BaseServlet.provisioningDataChanged(); + } + } + + private void syncNetworkRoutes(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n)); + coll.add(nr); + } catch (JSONException e) { + logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e); + } + } + if (sync(coll, NetworkRoute.getAllNetworkRoutes())) { + BaseServlet.provisioningDataChanged(); + } + } + + private boolean sync(Collection newc, Collection oldc) { + boolean changes = false; + try { + Map newmap = getMap(newc); + Map oldmap = getMap(oldc); + Set union = new TreeSet<>(newmap.keySet()); + union.addAll(oldmap.keySet()); + for (String n : union) { + Syncable newobj = newmap.get(n); + Syncable oldobj = oldmap.get(n); + if (oldobj == null) { + try (Connection conn = ProvDbUtils.getInstance().getConnection()) { + changes = insertRecord(conn, newobj); + } + } else if (newobj == null) { + try (Connection conn = ProvDbUtils.getInstance().getConnection()) { + changes = deleteRecord(conn, oldobj); + } + } else if (!newobj.equals(oldobj)) { + try (Connection conn = ProvDbUtils.getInstance().getConnection()) { + changes = updateRecord(conn, newobj, oldobj); + } + } + } + } catch (SQLException e) { + logger.warn("PROV5009: problem during sync, exception: " + e); + } + return changes; + } + + private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) { + if (logger.isDebugEnabled()) { + logger.debug(" Updating record: " + newobj); + } + boolean changes = newobj.doUpdate(conn); + checkChangeOwner(newobj, oldobj); + + return changes; + } + + private boolean deleteRecord(Connection conn, Syncable oldobj) { + if (logger.isDebugEnabled()) { + logger.debug(" Deleting record: " + oldobj); + } + return oldobj.doDelete(conn); + } + + private boolean insertRecord(Connection conn, Syncable newobj) { + if (logger.isDebugEnabled()) { + logger.debug(" Inserting record: " + newobj); + } + return newobj.doInsert(conn); + } + + private Map getMap(Collection coll) { + Map map = new HashMap<>(); + for (Syncable v : coll) { + map.put(v.getKey(), v); + } + return map; + } + + /** + * Change owner of FEED/SUBSCRIPTION. + * Rally US708115 Change Ownership of FEED - 1610 + */ + private void checkChangeOwner(Syncable newobj, Syncable oldobj) { + if (newobj instanceof Feed) { + Feed oldfeed = (Feed) oldobj; + Feed newfeed = (Feed) newobj; + + if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) { + logger.info("PROV5013 - Previous publisher: " + + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher()); + oldfeed.setPublisher(newfeed.getPublisher()); + oldfeed.changeOwnerShip(); + } + } else if (newobj instanceof Subscription) { + Subscription oldsub = (Subscription) oldobj; + Subscription newsub = (Subscription) newobj; + + if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) { + logger.info("PROV5013 - Previous subscriber: " + + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber()); + oldsub.setSubscriber(newsub.getSubscriber()); + oldsub.changeOwnerShip(); + } + } + + } + + /** + * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data. + * + * @return the provisioning data (as a JONObject) + */ + private synchronized JSONObject readProvisioningJson() { + String url = URLUtilities.generatePeerProvURL(); + HttpGet get = new HttpGet(url); + try { + HttpResponse response = httpclient.execute(get); + int code = response.getStatusLine().getStatusCode(); + if (code != HttpServletResponse.SC_OK) { + logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code); + return null; + } + HttpEntity entity = response.getEntity(); + String ctype = entity.getContentType().getValue().trim(); + if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) + && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) { + logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype); + return null; + } + return new JSONObject(new JSONTokener(entity.getContent())); + } catch (Exception e) { + logger.warn("PROV5012: readProvisioningJson failed, exception: " + e); + return null; + } finally { + get.releaseConnection(); + } + } + + /** + * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in + * the remote database. + * + * @return the bitset + */ + public RLEBitSet readRemoteLoglist() { + RLEBitSet bs = new RLEBitSet(); + String url = URLUtilities.generatePeerLogsURL(); + + //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset. + if ("".equals(url)) { + return bs; + } + //End of fix. + + HttpGet get = new HttpGet(url); + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + HttpResponse response = httpclient.execute(get); + int code = response.getStatusLine().getStatusCode(); + if (code != HttpServletResponse.SC_OK) { + logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code); + return bs; + } + HttpEntity entity = response.getEntity(); + String ctype = entity.getContentType().getValue().trim(); + if (!TEXT_CT.equals(ctype)) { + logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype); + return bs; + } + InputStream is = entity.getContent(); + int ch; + while ((ch = is.read()) >= 0) { + bos.write(ch); + } + bs.set(bos.toString()); + is.close(); + } catch (Exception e) { + logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e); + return bs; + } finally { + get.releaseConnection(); + } + return bs; + } + + /** + * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that + * we wish to copy to the local database. + * + * @param bs the bitset (an RELBitSet) of log records to fetch + */ + public void replicateDataRouterLogs(RLEBitSet bs) { + String url = URLUtilities.generatePeerLogsURL(); + HttpPost post = new HttpPost(url); + try { + String str = bs.toString(); + HttpEntity body = new ByteArrayEntity(str.getBytes(), ContentType.create(TEXT_CT)); + post.setEntity(body); + if (logger.isDebugEnabled()) { + logger.debug("Requesting records: " + str); + } + + HttpResponse response = httpclient.execute(post); + int code = response.getStatusLine().getStatusCode(); + if (code != HttpServletResponse.SC_OK) { + logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code); + return; + } + HttpEntity entity = response.getEntity(); + String ctype = entity.getContentType().getValue().trim(); + if (!TEXT_CT.equals(ctype)) { + logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype); + return; + } + + String spoolname = "" + System.currentTimeMillis(); + Path tmppath = Paths.get(spooldir, spoolname); + Path donepath = Paths.get(spooldir, "IN." + spoolname); + Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING); + Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING); + logger.info("Approximately " + bs.cardinality() + " records replicated."); + } catch (Exception e) { + logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e); + } finally { + post.releaseConnection(); + } + } +}