--- /dev/null
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ * * http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ * * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package org.onap.dmaap.datarouter.provisioning;\r
+\r
+import java.io.ByteArrayOutputStream;\r
+import java.io.File;\r
+import java.io.FileInputStream;\r
+import java.io.InputStream;\r
+import java.net.InetAddress;\r
+import java.net.UnknownHostException;\r
+import java.nio.file.Files;\r
+import java.nio.file.Path;\r
+import java.nio.file.Paths;\r
+import java.nio.file.StandardCopyOption;\r
+import java.security.KeyStore;\r
+import java.sql.Connection;\r
+import java.sql.SQLException;\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.HashMap;\r
+import java.util.Map;\r
+import java.util.Properties;\r
+import java.util.Set;\r
+import java.util.Timer;\r
+import java.util.TimerTask;\r
+import java.util.TreeSet;\r
+\r
+import javax.servlet.http.HttpServletResponse;\r
+\r
+import org.apache.http.HttpEntity;\r
+import org.apache.http.HttpResponse;\r
+import org.apache.http.client.methods.HttpGet;\r
+import org.apache.http.client.methods.HttpPost;\r
+import org.apache.http.conn.scheme.Scheme;\r
+import org.apache.http.conn.ssl.SSLSocketFactory;\r
+import org.apache.http.entity.ByteArrayEntity;\r
+import org.apache.http.entity.ContentType;\r
+import org.apache.http.impl.client.AbstractHttpClient;\r
+import org.apache.http.impl.client.DefaultHttpClient;\r
+import org.apache.log4j.Logger;\r
+import org.json.JSONArray;\r
+import org.json.JSONException;\r
+import org.json.JSONObject;\r
+import org.json.JSONTokener;\r
+import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Feed;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Group;\r
+import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;\r
+import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Subscription;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Syncable;\r
+import org.onap.dmaap.datarouter.provisioning.utils.DB;\r
+import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;\r
+import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;\r
+import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;\r
+\r
+/**\r
+ * This class handles synchronization between provisioning servers (PODs). It has three primary functions:\r
+ * <ol>\r
+ * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to\r
+ * the active (master) POD.</li>\r
+ * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MySQL in sync.</li>\r
+ * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)\r
+ * of this POD.</li>\r
+ * </ol>\r
+ * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>\r
+ * <code>\r
+ * Security.setProperty("networkaddress.cache.ttl", "10");\r
+ * </code>\r
+ *\r
+ * @author Robert Eby\r
+ * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $\r
+ */\r
+public class SynchronizerTask extends TimerTask {\r
+ /** This is a singleton -- there is only one SynchronizerTask object in the server */\r
+ private static SynchronizerTask synctask;\r
+\r
+ /** This POD is unknown -- not on the list of PODs */\r
+ public static final int UNKNOWN = 0;\r
+ /** This POD is active -- on the list of PODs, and the DNS CNAME points to us */\r
+ public static final int ACTIVE = 1;\r
+ /** This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us */\r
+ public static final int STANDBY = 2;\r
+ private static final String[] stnames = { "UNKNOWN", "ACTIVE", "STANDBY" };\r
+ private static final long ONE_HOUR = 60 * 60 * 1000L;\r
+\r
+ private final Logger logger;\r
+ private final Timer rolex;\r
+ private final String spooldir;\r
+ private int state;\r
+ private boolean doFetch;\r
+ private long nextsynctime;\r
+ private AbstractHttpClient httpclient = null;\r
+\r
+ /**\r
+ * Get the singleton SynchronizerTask object.\r
+ * @return the SynchronizerTask\r
+ */\r
+ public static synchronized SynchronizerTask getSynchronizer() {\r
+ if (synctask == null)\r
+ synctask = new SynchronizerTask();\r
+ return synctask;\r
+ }\r
+\r
+ @SuppressWarnings("deprecation")\r
+ private SynchronizerTask() {\r
+ logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
+ rolex = new Timer();\r
+ spooldir = (new DB()).getProperties().getProperty("com.att.research.datarouter.provserver.spooldir");\r
+ state = UNKNOWN;\r
+ doFetch = true; // start off with a fetch\r
+ nextsynctime = 0;\r
+\r
+ logger.info("PROV5000: Sync task starting, server state is UNKNOWN");\r
+ try {\r
+ Properties props = (new DB()).getProperties();\r
+ String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");\r
+ String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);\r
+ String pass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY);\r
+ KeyStore keyStore = KeyStore.getInstance(type);\r
+ FileInputStream instream = new FileInputStream(new File(store));\r
+ keyStore.load(instream, pass.toCharArray());\r
+ instream.close();\r
+\r
+ store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);\r
+ pass = props.getProperty(Main.TRUSTSTORE_PASSWORD_PROPERTY);\r
+ KeyStore trustStore = null;\r
+ if (store != null && store.length() > 0) {\r
+ trustStore = KeyStore.getInstance(KeyStore.getDefaultType());\r
+ instream = new FileInputStream(new File(store));\r
+ trustStore.load(instream, pass.toCharArray());\r
+ instream.close();\r
+ }\r
+\r
+ // We are connecting with the node name, but the certificate will have the CNAME\r
+ // So we need to accept a non-matching certificate name\r
+ String keystorepass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref\r
+ AbstractHttpClient hc = new DefaultHttpClient();\r
+ SSLSocketFactory socketFactory =\r
+ (trustStore == null)\r
+ ? new SSLSocketFactory(keyStore, keystorepass)\r
+ : new SSLSocketFactory(keyStore, keystorepass, trustStore);\r
+ socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);\r
+ Scheme sch = new Scheme("https", 443, socketFactory);\r
+ hc.getConnectionManager().getSchemeRegistry().register(sch);\r
+ httpclient = hc;\r
+\r
+ // Run once every 5 seconds to check DNS, etc.\r
+ long interval = 0;\r
+ try {\r
+ String s = props.getProperty("com.att.research.datarouter.provserver.sync_interval", "5000");\r
+ interval = Long.parseLong(s);\r
+ } catch (NumberFormatException e) {\r
+ interval = 5000L;\r
+ }\r
+ rolex.scheduleAtFixedRate(this, 0L, interval);\r
+ } catch (Exception e) {\r
+ logger.warn("PROV5005: Problem starting the synchronizer: "+e);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * What is the state of this POD?\r
+ * @return one of ACTIVE, STANDBY, UNKNOWN\r
+ */\r
+ public int getState() {\r
+ return state;\r
+ }\r
+\r
+ /**\r
+ * Is this the active POD?\r
+ * @return true if we are active (the master), false otherwise\r
+ */\r
+ public boolean isActive() {\r
+ return state == ACTIVE;\r
+ }\r
+\r
+ /**\r
+ * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request,\r
+ * and that we should re-synchronize with the master.\r
+ */\r
+ public void doFetch() {\r
+ doFetch = true;\r
+ }\r
+\r
+ /**\r
+ * Runs once a minute in order to <ol>\r
+ * <li>lookup DNS names,</li>\r
+ * <li>determine the state of this POD,</li>\r
+ * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>\r
+ * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>\r
+ * </ol>\r
+ */\r
+ @Override\r
+ public void run() {\r
+ try {\r
+ state = lookupState();\r
+ if (state == STANDBY) {\r
+ // Only copy provisioning data FROM the active server TO the standby\r
+ if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {\r
+ logger.debug("Initiating a sync...");\r
+ JSONObject jo = readProvisioningJSON();\r
+ if (jo != null) {\r
+ doFetch = false;\r
+ syncFeeds( jo.getJSONArray("feeds"));\r
+ syncSubs( jo.getJSONArray("subscriptions"));\r
+ syncGroups( jo.getJSONArray("groups")); //Rally:US708115 - 1610\r
+ syncParams(jo.getJSONObject("parameters"));\r
+ // The following will not be present in a version=1.0 provfeed\r
+ JSONArray ja = jo.optJSONArray("ingress");\r
+ if (ja != null)\r
+ syncIngressRoutes(ja);\r
+ JSONObject j2 = jo.optJSONObject("egress");\r
+ if (j2 != null)\r
+ syncEgressRoutes( j2);\r
+ ja = jo.optJSONArray("routing");\r
+ if (ja != null)\r
+ syncNetworkRoutes(ja);\r
+ }\r
+ logger.info("PROV5013: Sync completed.");\r
+ nextsynctime = System.currentTimeMillis() + ONE_HOUR;\r
+ }\r
+ } else {\r
+ // Don't do fetches on non-standby PODs\r
+ doFetch = false;\r
+ }\r
+\r
+ // Fetch DR logs as needed - server to server\r
+ LogfileLoader lfl = LogfileLoader.getLoader();\r
+ if (lfl.isIdle()) {\r
+ // Only fetch new logs if the loader is waiting for them.\r
+ logger.trace("Checking for logs to replicate...");\r
+ RLEBitSet local = lfl.getBitSet();\r
+ RLEBitSet remote = readRemoteLoglist();\r
+ remote.andNot(local);\r
+ if (!remote.isEmpty()) {\r
+ logger.debug(" Replicating logs: "+remote);\r
+ replicateDRLogs(remote);\r
+ }\r
+ }\r
+ } catch (Exception e) {\r
+ logger.warn("PROV0020: Caught exception in SynchronizerTask: "+e);\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * This method is used to lookup the CNAME that points to the active server.\r
+ * It returns 0 (UNKNOWN), 1(ACTIVE), or 2 (STANDBY) to indicate the state of this server.\r
+ * @return the current state\r
+ */\r
+ private int lookupState() {\r
+ int newstate = UNKNOWN;\r
+ try {\r
+ InetAddress myaddr = InetAddress.getLocalHost();\r
+ if (logger.isTraceEnabled())\r
+ logger.trace("My address: "+myaddr);\r
+ String this_pod = myaddr.getHostName();\r
+ Set<String> pods = new TreeSet<String>(Arrays.asList(BaseServlet.getPods()));\r
+ if (pods.contains(this_pod)) {\r
+ InetAddress pserver = InetAddress.getByName(BaseServlet.active_prov_name);\r
+ newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;\r
+ if (logger.isDebugEnabled() && System.currentTimeMillis() >= next_msg) {\r
+ logger.debug("Active POD = "+pserver+", Current state is "+stnames[newstate]);\r
+ next_msg = System.currentTimeMillis() + (5 * 60 * 1000L);\r
+ }\r
+ } else {\r
+ logger.warn("PROV5003: My name ("+this_pod+") is missing from the list of provisioning servers.");\r
+ }\r
+ } catch (UnknownHostException e) {\r
+ logger.warn("PROV5002: Cannot determine the name of this provisioning server.");\r
+ }\r
+\r
+ if (newstate != state)\r
+ logger.info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));\r
+ return newstate;\r
+ }\r
+ private static long next_msg = 0; // only display the "Current state" msg every 5 mins.\r
+ /** Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */\r
+ private void syncFeeds(JSONArray ja) {\r
+ Collection<Syncable> coll = new ArrayList<Syncable>();\r
+ for (int n = 0; n < ja.length(); n++) {\r
+ try {\r
+ Feed f = new Feed(ja.getJSONObject(n));\r
+ coll.add(f);\r
+ } catch (Exception e) {\r
+ logger.warn("PROV5004: Invalid object in feed: "+ja.optJSONObject(n));\r
+ }\r
+ }\r
+ if (sync(coll, Feed.getAllFeeds()))\r
+ BaseServlet.provisioningDataChanged();\r
+ }\r
+ /** Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. */\r
+ private void syncSubs(JSONArray ja) {\r
+ Collection<Syncable> coll = new ArrayList<Syncable>();\r
+ for (int n = 0; n < ja.length(); n++) {\r
+ try {\r
+ //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.\r
+ JSONObject j = ja.getJSONObject(n); \r
+ j.put("sync", "true");\r
+ Subscription s = new Subscription(j);\r
+ coll.add(s);\r
+ } catch (Exception e) {\r
+ logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));\r
+ }\r
+ }\r
+ if (sync(coll, Subscription.getAllSubscriptions()))\r
+ BaseServlet.provisioningDataChanged();\r
+ }\r
+\r
+ /** Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB. */ \r
+ private void syncGroups(JSONArray ja) { \r
+ Collection<Syncable> coll = new ArrayList<Syncable>(); \r
+ for (int n = 0; n < ja.length(); n++) { \r
+ try { \r
+ Group g = new Group(ja.getJSONObject(n)); \r
+ coll.add(g); \r
+ } catch (Exception e) { \r
+ logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n)); \r
+ } \r
+ } \r
+ if (sync(coll, Group.getAllgroups())) \r
+ BaseServlet.provisioningDataChanged(); \r
+ }\r
+\r
+\r
+ /** Synchronize the Parameters in the JSONObject, with the Parameters in the DB. */\r
+ private void syncParams(JSONObject jo) {\r
+ Collection<Syncable> coll = new ArrayList<Syncable>();\r
+ for (String k : jo.keySet()) {\r
+ String v = "";\r
+ try {\r
+ v = jo.getString(k);\r
+ } catch (JSONException e) {\r
+ try {\r
+ v = ""+jo.getInt(k);\r
+ } catch (JSONException e1) {\r
+ JSONArray ja = jo.getJSONArray(k);\r
+ for (int i = 0; i < ja.length(); i++) {\r
+ if (i > 0)\r
+ v += "|";\r
+ v += ja.getString(i);\r
+ }\r
+ }\r
+ }\r
+ coll.add(new Parameters(k, v));\r
+ }\r
+ if (sync(coll, Parameters.getParameterCollection())) {\r
+ BaseServlet.provisioningDataChanged();\r
+ BaseServlet.provisioningParametersChanged();\r
+ }\r
+ }\r
+ private void syncIngressRoutes(JSONArray ja) {\r
+ Collection<Syncable> coll = new ArrayList<Syncable>();\r
+ for (int n = 0; n < ja.length(); n++) {\r
+ try {\r
+ IngressRoute in = new IngressRoute(ja.getJSONObject(n));\r
+ coll.add(in);\r
+ } catch (NumberFormatException e) {\r
+ logger.warn("PROV5004: Invalid object in ingress routes: "+ja.optJSONObject(n));\r
+ }\r
+ }\r
+ if (sync(coll, IngressRoute.getAllIngressRoutes()))\r
+ BaseServlet.provisioningDataChanged();\r
+ }\r
+ private void syncEgressRoutes(JSONObject jo) {\r
+ Collection<Syncable> coll = new ArrayList<Syncable>();\r
+ for (String key : jo.keySet()) {\r
+ try {\r
+ int sub = Integer.parseInt(key);\r
+ String node = jo.getString(key);\r
+ EgressRoute er = new EgressRoute(sub, node);\r
+ coll.add(er);\r
+ } catch (NumberFormatException e) {\r
+ logger.warn("PROV5004: Invalid subid in egress routes: "+key);\r
+ } catch (IllegalArgumentException e) {\r
+ logger.warn("PROV5004: Invalid node name in egress routes: "+key);\r
+ }\r
+ }\r
+ if (sync(coll, EgressRoute.getAllEgressRoutes()))\r
+ BaseServlet.provisioningDataChanged();\r
+ }\r
+ private void syncNetworkRoutes(JSONArray ja) {\r
+ Collection<Syncable> coll = new ArrayList<Syncable>();\r
+ for (int n = 0; n < ja.length(); n++) {\r
+ try {\r
+ NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));\r
+ coll.add(nr);\r
+ } catch (JSONException e) {\r
+ logger.warn("PROV5004: Invalid object in network routes: "+ja.optJSONObject(n));\r
+ }\r
+ }\r
+ if (sync(coll, NetworkRoute.getAllNetworkRoutes()))\r
+ BaseServlet.provisioningDataChanged();\r
+ }\r
+ private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {\r
+ boolean changes = false;\r
+ try {\r
+ Map<String, Syncable> newmap = getMap(newc);\r
+ Map<String, Syncable> oldmap = getMap(oldc);\r
+ Set<String> union = new TreeSet<String>(newmap.keySet());\r
+ union.addAll(oldmap.keySet());\r
+ DB db = new DB();\r
+ @SuppressWarnings("resource")\r
+ Connection conn = db.getConnection();\r
+ for (String n : union) {\r
+ Syncable newobj = newmap.get(n);\r
+ Syncable oldobj = oldmap.get(n);\r
+ if (oldobj == null) {\r
+ if (logger.isDebugEnabled())\r
+ logger.debug(" Inserting record: "+newobj);\r
+ newobj.doInsert(conn);\r
+ changes = true;\r
+ } else if (newobj == null) {\r
+ if (logger.isDebugEnabled())\r
+ logger.debug(" Deleting record: "+oldobj);\r
+ oldobj.doDelete(conn);\r
+ changes = true;\r
+ } else if (!newobj.equals(oldobj)) {\r
+ if (logger.isDebugEnabled())\r
+ logger.debug(" Updating record: "+newobj);\r
+ newobj.doUpdate(conn);\r
+\r
+ /**Rally US708115\r
+ * Change Ownership of FEED - 1610, Syncronised with secondary DB.\r
+ * */\r
+ checkChnageOwner(newobj, oldobj);\r
+\r
+ changes = true;\r
+ }\r
+ }\r
+ db.release(conn);\r
+ } catch (SQLException e) {\r
+ logger.warn("PROV5009: problem during sync, exception: "+e);\r
+ e.printStackTrace();\r
+ }\r
+ return changes;\r
+ }\r
+ private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {\r
+ Map<String, Syncable> map = new HashMap<String, Syncable>();\r
+ for (Syncable v : c) {\r
+ map.put(v.getKey(), v);\r
+ }\r
+ return map;\r
+ }\r
+ \r
+\r
+ /**Change owner of FEED/SUBSCRIPTION*/\r
+ /**Rally US708115\r
+ * Change Ownership of FEED - 1610\r
+ * \r
+ * */\r
+ private void checkChnageOwner(Syncable newobj, Syncable oldobj) {\r
+ if(newobj instanceof Feed) {\r
+ Feed oldfeed = (Feed) oldobj;\r
+ Feed newfeed = (Feed) newobj;\r
+ \r
+ if(!oldfeed.getPublisher().equals(newfeed.getPublisher())){\r
+ logger.info("PROV5013 - Previous publisher: "+oldfeed.getPublisher() +": New publisher-"+newfeed.getPublisher());\r
+ oldfeed.setPublisher(newfeed.getPublisher());\r
+ oldfeed.changeOwnerShip();\r
+ }\r
+ }\r
+ else if(newobj instanceof Subscription) {\r
+ Subscription oldsub = (Subscription) oldobj;\r
+ Subscription newsub = (Subscription) newobj;\r
+ \r
+ if(!oldsub.getSubscriber().equals(newsub.getSubscriber())){\r
+ logger.info("PROV5013 - Previous subscriber: "+oldsub.getSubscriber() +": New subscriber-"+newsub.getSubscriber());\r
+ oldsub.setSubscriber(newsub.getSubscriber());\r
+ oldsub.changeOwnerShip();\r
+ }\r
+ }\r
+ \r
+ }\r
+\r
+ /**\r
+ * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.\r
+ * @return the provisioning data (as a JONObject)\r
+ */\r
+ private synchronized JSONObject readProvisioningJSON() {\r
+ String url = URLUtilities.generatePeerProvURL();\r
+ HttpGet get = new HttpGet(url);\r
+ try {\r
+ HttpResponse response = httpclient.execute(get);\r
+ int code = response.getStatusLine().getStatusCode();\r
+ if (code != HttpServletResponse.SC_OK) {\r
+ logger.warn("PROV5010: readProvisioningJSON failed, bad error code: "+code);\r
+ return null;\r
+ }\r
+ HttpEntity entity = response.getEntity();\r
+ String ctype = entity.getContentType().getValue().trim();\r
+ if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {\r
+ logger.warn("PROV5011: readProvisioningJSON failed, bad content type: "+ctype);\r
+ return null;\r
+ }\r
+ return new JSONObject(new JSONTokener(entity.getContent()));\r
+ } catch (Exception e) {\r
+ logger.warn("PROV5012: readProvisioningJSON failed, exception: "+e);\r
+ return null;\r
+ } finally {\r
+ get.releaseConnection();\r
+ }\r
+ }\r
+ /**\r
+ * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the\r
+ * log records available in the remote database.\r
+ * @return the bitset\r
+ */\r
+ private RLEBitSet readRemoteLoglist() {\r
+ RLEBitSet bs = new RLEBitSet();\r
+ String url = URLUtilities.generatePeerLogsURL();\r
+\r
+ //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.\r
+ if(url.equals("")) {\r
+ return bs;\r
+ }\r
+ //End of fix.\r
+\r
+ HttpGet get = new HttpGet(url);\r
+ try {\r
+ HttpResponse response = httpclient.execute(get);\r
+ int code = response.getStatusLine().getStatusCode();\r
+ if (code != HttpServletResponse.SC_OK) {\r
+ logger.warn("PROV5010: readRemoteLoglist failed, bad error code: "+code);\r
+ return bs;\r
+ }\r
+ HttpEntity entity = response.getEntity();\r
+ String ctype = entity.getContentType().getValue().trim();\r
+ if (!ctype.equals("text/plain")) {\r
+ logger.warn("PROV5011: readRemoteLoglist failed, bad content type: "+ctype);\r
+ return bs;\r
+ }\r
+ InputStream is = entity.getContent();\r
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();\r
+ int ch = 0;\r
+ while ((ch = is.read()) >= 0)\r
+ bos.write(ch);\r
+ bs.set(bos.toString());\r
+ is.close();\r
+ } catch (Exception e) {\r
+ logger.warn("PROV5012: readRemoteLoglist failed, exception: "+e);\r
+ return bs;\r
+ } finally {\r
+ get.releaseConnection();\r
+ }\r
+ return bs;\r
+ }\r
+ /**\r
+ * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available\r
+ * in the remote database that we wish to copy to the local database.\r
+ * @param bs the bitset (an RELBitSet) of log records to fetch\r
+ */\r
+ private void replicateDRLogs(RLEBitSet bs) {\r
+ String url = URLUtilities.generatePeerLogsURL();\r
+ HttpPost post = new HttpPost(url);\r
+ try {\r
+ String t = bs.toString();\r
+ HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));\r
+ post.setEntity(body);\r
+ if (logger.isDebugEnabled())\r
+ logger.debug("Requesting records: "+t);\r
+\r
+ HttpResponse response = httpclient.execute(post);\r
+ int code = response.getStatusLine().getStatusCode();\r
+ if (code != HttpServletResponse.SC_OK) {\r
+ logger.warn("PROV5010: replicateDRLogs failed, bad error code: "+code);\r
+ return;\r
+ }\r
+ HttpEntity entity = response.getEntity();\r
+ String ctype = entity.getContentType().getValue().trim();\r
+ if (!ctype.equals("text/plain")) {\r
+ logger.warn("PROV5011: replicateDRLogs failed, bad content type: "+ctype);\r
+ return;\r
+ }\r
+\r
+ String spoolname = "" + System.currentTimeMillis();\r
+ Path tmppath = Paths.get(spooldir, spoolname);\r
+ Path donepath = Paths.get(spooldir, "IN."+spoolname);\r
+ Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);\r
+ Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);\r
+ logger.info("Approximately "+bs.cardinality()+" records replicated.");\r
+ } catch (Exception e) {\r
+ logger.warn("PROV5012: replicateDRLogs failed, exception: "+e);\r
+ } finally {\r
+ post.releaseConnection();\r
+ }\r
+ }\r
+}\r