datarouter-prov code clean - remove tabs
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / SynchronizerTask.java
index 2e0fc21..898a3f0 100644 (file)
-/*******************************************************************************\r
- * ============LICENSE_START==================================================\r
- * * org.onap.dmaap\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * * \r
- *  *      http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
- *  * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-\r
-\r
-package org.onap.dmaap.datarouter.provisioning;\r
-\r
-import java.io.ByteArrayOutputStream;\r
-import java.io.File;\r
-import java.io.FileInputStream;\r
-import java.io.InputStream;\r
-import java.net.InetAddress;\r
-import java.net.UnknownHostException;\r
-import java.nio.file.Files;\r
-import java.nio.file.Path;\r
-import java.nio.file.Paths;\r
-import java.nio.file.StandardCopyOption;\r
-import java.security.KeyStore;\r
-import java.sql.Connection;\r
-import java.sql.SQLException;\r
-import java.util.ArrayList;\r
-import java.util.Arrays;\r
-import java.util.Collection;\r
-import java.util.HashMap;\r
-import java.util.Map;\r
-import java.util.Properties;\r
-import java.util.Set;\r
-import java.util.Timer;\r
-import java.util.TimerTask;\r
-import java.util.TreeSet;\r
-\r
-import javax.servlet.http.HttpServletResponse;\r
-\r
-import org.apache.http.HttpEntity;\r
-import org.apache.http.HttpResponse;\r
-import org.apache.http.client.methods.HttpGet;\r
-import org.apache.http.client.methods.HttpPost;\r
-import org.apache.http.conn.scheme.Scheme;\r
-import org.apache.http.conn.ssl.SSLSocketFactory;\r
-import org.apache.http.entity.ByteArrayEntity;\r
-import org.apache.http.entity.ContentType;\r
-import org.apache.http.impl.client.AbstractHttpClient;\r
-import org.apache.http.impl.client.DefaultHttpClient;\r
-import org.apache.log4j.Logger;\r
-import org.json.JSONArray;\r
-import org.json.JSONException;\r
-import org.json.JSONObject;\r
-import org.json.JSONTokener;\r
-import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Feed;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Group;\r
-import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;\r
-import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Subscription;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Syncable;\r
-import org.onap.dmaap.datarouter.provisioning.utils.DB;\r
-import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;\r
-import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;\r
-import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;\r
-\r
-/**\r
- * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:\r
- * <ol>\r
- * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to\r
- * the active (master) POD.</li>\r
- * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>\r
- * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)\r
- * of this POD.</li>\r
- * </ol>\r
- * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>\r
- * <code>\r
- *             Security.setProperty("networkaddress.cache.ttl", "10");\r
- * </code>\r
- *\r
- * @author Robert Eby\r
- * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $\r
- */\r
-public class SynchronizerTask extends TimerTask {\r
-       /** This is a singleton -- there is only one SynchronizerTask object in the server */\r
-       private static SynchronizerTask synctask;\r
-\r
-       /** This POD is unknown -- not on the list of PODs */\r
-       public static final int UNKNOWN = 0;\r
-       /** This POD is active -- on the list of PODs, and the DNS CNAME points to us */\r
-       public static final int ACTIVE = 1;\r
-       /** This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us */\r
-       public static final int STANDBY = 2;\r
-       private static final String[] stnames = { "UNKNOWN", "ACTIVE", "STANDBY" };\r
-       private static final long ONE_HOUR = 60 * 60 * 1000L;\r
-\r
-       private final Logger logger;\r
-       private final Timer rolex;\r
-       private final String spooldir;\r
-       private int state;\r
-       private boolean doFetch;\r
-       private long nextsynctime;\r
-       private AbstractHttpClient httpclient = null;\r
-\r
-       /**\r
-        * Get the singleton SynchronizerTask object.\r
-        * @return the SynchronizerTask\r
-        */\r
-       public static synchronized SynchronizerTask getSynchronizer() {\r
-               if (synctask == null)\r
-                       synctask = new SynchronizerTask();\r
-               return synctask;\r
-       }\r
-\r
-       @SuppressWarnings("deprecation")\r
-       private SynchronizerTask() {\r
-               logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
-               rolex = new Timer();\r
-               spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");\r
-               state = UNKNOWN;\r
-               doFetch = true;         // start off with a fetch\r
-               nextsynctime = 0;\r
-\r
-               logger.info("PROV5000: Sync task starting, server state is UNKNOWN");\r
-               try {\r
-                       Properties props = (new DB()).getProperties();\r
-                       String type  = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");\r
-                       String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);\r
-                       String pass  = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY);\r
-                       KeyStore keyStore = KeyStore.getInstance(type);\r
-                       FileInputStream instream = new FileInputStream(new File(store));\r
-                       keyStore.load(instream, pass.toCharArray());\r
-                       instream.close();\r
-\r
-                       store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);\r
-                       pass  = props.getProperty(Main.TRUSTSTORE_PASSWORD_PROPERTY);\r
-                       KeyStore trustStore = null;\r
-                       if (store != null && store.length() > 0) {\r
-                               trustStore = KeyStore.getInstance(KeyStore.getDefaultType());\r
-                               instream = new FileInputStream(new File(store));\r
-                               trustStore.load(instream, pass.toCharArray());\r
-                               instream.close();\r
-                       }\r
-\r
-                       // We are connecting with the node name, but the certificate will have the CNAME\r
-                       // So we need to accept a non-matching certificate name\r
-                       String keystorepass  = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref\r
-                       AbstractHttpClient hc = new DefaultHttpClient();\r
-                       SSLSocketFactory socketFactory =\r
-                               (trustStore == null)\r
-                               ? new SSLSocketFactory(keyStore, keystorepass)\r
-                               : new SSLSocketFactory(keyStore, keystorepass, trustStore);\r
-                       socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);\r
-                       Scheme sch = new Scheme("https", 443, socketFactory);\r
-                       hc.getConnectionManager().getSchemeRegistry().register(sch);\r
-                       httpclient = hc;\r
-\r
-                       // Run once every 5 seconds to check DNS, etc.\r
-                       long interval = 0;\r
-                       try {\r
-                               String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");\r
-                               interval = Long.parseLong(s);\r
-                       } catch (NumberFormatException e) {\r
-                               interval = 5000L;\r
-                       }\r
-                       rolex.scheduleAtFixedRate(this, 0L, interval);\r
-               } catch (Exception e) {\r
-                       logger.warn("PROV5005: Problem starting the synchronizer: "+e);\r
-               }\r
-       }\r
-\r
-       /**\r
-        * What is the state of this POD?\r
-        * @return one of ACTIVE, STANDBY, UNKNOWN\r
-        */\r
-       public int getState() {\r
-               return state;\r
-       }\r
-\r
-       /**\r
-        * Is this the active POD?\r
-        * @return true if we are active (the master), false otherwise\r
-        */\r
-       public boolean isActive() {\r
-               return state == ACTIVE;\r
-       }\r
-\r
-       /**\r
-        * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request,\r
-        * and that we should re-synchronize with the master.\r
-        */\r
-       public void doFetch() {\r
-               doFetch = true;\r
-       }\r
-\r
-       /**\r
-        * Runs once a minute in order to <ol>\r
-        * <li>lookup DNS names,</li>\r
-        * <li>determine the state of this POD,</li>\r
-        * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>\r
-        * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>\r
-        * </ol>\r
-        */\r
-       @Override\r
-       public void run() {\r
-               try {\r
-                       state = lookupState();\r
-                       if (state == STANDBY) {\r
-                               // Only copy provisioning data FROM the active server TO the standby\r
-                               if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {\r
-                                       logger.debug("Initiating a sync...");\r
-                                       JSONObject jo = readProvisioningJSON();\r
-                                       if (jo != null) {\r
-                                               doFetch = false;\r
-                                               syncFeeds( jo.getJSONArray("feeds"));\r
-                                               syncSubs(  jo.getJSONArray("subscriptions"));\r
-                                               syncGroups(  jo.getJSONArray("groups")); //Rally:US708115 - 1610\r
-                                               syncParams(jo.getJSONObject("parameters"));\r
-                                               // The following will not be present in a version=1.0 provfeed\r
-                                               JSONArray ja = jo.optJSONArray("ingress");\r
-                                               if (ja != null)\r
-                                                       syncIngressRoutes(ja);\r
-                                               JSONObject j2 = jo.optJSONObject("egress");\r
-                                               if (j2 != null)\r
-                                                       syncEgressRoutes( j2);\r
-                                               ja = jo.optJSONArray("routing");\r
-                                               if (ja != null)\r
-                                                       syncNetworkRoutes(ja);\r
-                                       }\r
-                                       logger.info("PROV5013: Sync completed.");\r
-                                       nextsynctime = System.currentTimeMillis() + ONE_HOUR;\r
-                               }\r
-                       } else {\r
-                               // Don't do fetches on non-standby PODs\r
-                               doFetch = false;\r
-                       }\r
-\r
-                       // Fetch DR logs as needed - server to server\r
-                       LogfileLoader lfl = LogfileLoader.getLoader();\r
-                       if (lfl.isIdle()) {\r
-                               // Only fetch new logs if the loader is waiting for them.\r
-                               logger.trace("Checking for logs to replicate...");\r
-                               RLEBitSet local  = lfl.getBitSet();\r
-                               RLEBitSet remote = readRemoteLoglist();\r
-                               remote.andNot(local);\r
-                               if (!remote.isEmpty()) {\r
-                                       logger.debug(" Replicating logs: "+remote);\r
-                                       replicateDRLogs(remote);\r
-                               }\r
-                       }\r
-               } catch (Exception e) {\r
-                       logger.warn("PROV0020: Caught exception in SynchronizerTask: "+e);\r
-                       e.printStackTrace();\r
-               }\r
-       }\r
-\r
-       /**\r
-        * This method is used to lookup the CNAME that points to the active server.\r
-        * It returns 0 (UNKNOWN), 1(ACTIVE), or 2 (STANDBY) to indicate the state of this server.\r
-        * @return the current state\r
-        */\r
-       private int lookupState() {\r
-               int newstate = UNKNOWN;\r
-               try {\r
-                       InetAddress myaddr = InetAddress.getLocalHost();\r
-                       if (logger.isTraceEnabled())\r
-                               logger.trace("My address: "+myaddr);\r
-                       String this_pod = myaddr.getHostName();\r
-                       Set<String> pods = new TreeSet<String>(Arrays.asList(BaseServlet.getPods()));\r
-                       if (pods.contains(this_pod)) {\r
-                               InetAddress pserver = InetAddress.getByName(BaseServlet.active_prov_name);\r
-                               newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;\r
-                               if (logger.isDebugEnabled() && System.currentTimeMillis() >= next_msg) {\r
-                                       logger.debug("Active POD = "+pserver+", Current state is "+stnames[newstate]);\r
-                                       next_msg = System.currentTimeMillis() + (5 * 60 * 1000L);\r
-                               }\r
-                       } else {\r
-                               logger.warn("PROV5003: My name ("+this_pod+") is missing from the list of provisioning servers.");\r
-                       }\r
-               } catch (UnknownHostException e) {\r
-                       logger.warn("PROV5002: Cannot determine the name of this provisioning server.");\r
-               }\r
-\r
-               if (newstate != state)\r
-                       logger.info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));\r
-               return newstate;\r
-       }\r
-       private static long next_msg = 0;       // only display the "Current state" msg every 5 mins.\r
-       /** Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */\r
-       private void syncFeeds(JSONArray ja) {\r
-               Collection<Syncable> coll = new ArrayList<Syncable>();\r
-               for (int n = 0; n < ja.length(); n++) {\r
-                       try {\r
-                               Feed f = new Feed(ja.getJSONObject(n));\r
-                               coll.add(f);\r
-                       } catch (Exception e) {\r
-                               logger.warn("PROV5004: Invalid object in feed: "+ja.optJSONObject(n));\r
-                       }\r
-               }\r
-               if (sync(coll, Feed.getAllFeeds()))\r
-                       BaseServlet.provisioningDataChanged();\r
-       }\r
-       /** Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. */\r
-       private void syncSubs(JSONArray ja) {\r
-               Collection<Syncable> coll = new ArrayList<Syncable>();\r
-               for (int n = 0; n < ja.length(); n++) {\r
-                       try {\r
-                               //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.\r
-                               JSONObject j = ja.getJSONObject(n);      \r
-                               j.put("sync", "true");\r
-                               Subscription s = new Subscription(j);\r
-                               coll.add(s);\r
-                       } catch (Exception e) {\r
-                               logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));\r
-                       }\r
-               }\r
-               if (sync(coll, Subscription.getAllSubscriptions()))\r
-                       BaseServlet.provisioningDataChanged();\r
-       }\r
-\r
-       /**  Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB. */           \r
-       private void syncGroups(JSONArray ja) {         \r
-               Collection<Syncable> coll = new ArrayList<Syncable>();          \r
-               for (int n = 0; n < ja.length(); n++) {         \r
-                       try {           \r
-                               Group g = new Group(ja.getJSONObject(n));               \r
-                               coll.add(g);            \r
-                       } catch (Exception e) {         \r
-                               logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));          \r
-                       }               \r
-               }               \r
-               if (sync(coll, Group.getAllgroups()))           \r
-                       BaseServlet.provisioningDataChanged();          \r
-       }\r
-\r
-\r
-       /** Synchronize the Parameters in the JSONObject, with the Parameters in the DB. */\r
-       private void syncParams(JSONObject jo) {\r
-               Collection<Syncable> coll = new ArrayList<Syncable>();\r
-               for (String k : jo.keySet()) {\r
-                       String v = "";\r
-                       try {\r
-                               v = jo.getString(k);\r
-                       } catch (JSONException e) {\r
-                               try {\r
-                                       v = ""+jo.getInt(k);\r
-                               } catch (JSONException e1) {\r
-                                       JSONArray ja = jo.getJSONArray(k);\r
-                                       for (int i = 0; i < ja.length(); i++) {\r
-                                               if (i > 0)\r
-                                                       v += "|";\r
-                                               v += ja.getString(i);\r
-                                       }\r
-                               }\r
-                       }\r
-                       coll.add(new Parameters(k, v));\r
-               }\r
-               if (sync(coll, Parameters.getParameterCollection())) {\r
-                       BaseServlet.provisioningDataChanged();\r
-                       BaseServlet.provisioningParametersChanged();\r
-               }\r
-       }\r
-       private void syncIngressRoutes(JSONArray ja) {\r
-               Collection<Syncable> coll = new ArrayList<Syncable>();\r
-               for (int n = 0; n < ja.length(); n++) {\r
-                       try {\r
-                               IngressRoute in = new IngressRoute(ja.getJSONObject(n));\r
-                               coll.add(in);\r
-                       } catch (NumberFormatException e) {\r
-                               logger.warn("PROV5004: Invalid object in ingress routes: "+ja.optJSONObject(n));\r
-                       }\r
-               }\r
-               if (sync(coll, IngressRoute.getAllIngressRoutes()))\r
-                       BaseServlet.provisioningDataChanged();\r
-       }\r
-       private void syncEgressRoutes(JSONObject jo) {\r
-               Collection<Syncable> coll = new ArrayList<Syncable>();\r
-               for (String key : jo.keySet()) {\r
-                       try {\r
-                               int sub = Integer.parseInt(key);\r
-                               String node = jo.getString(key);\r
-                               EgressRoute er = new EgressRoute(sub, node);\r
-                               coll.add(er);\r
-                       } catch (NumberFormatException e) {\r
-                               logger.warn("PROV5004: Invalid subid in egress routes: "+key);\r
-                       } catch (IllegalArgumentException e) {\r
-                               logger.warn("PROV5004: Invalid node name in egress routes: "+key);\r
-                       }\r
-               }\r
-               if (sync(coll, EgressRoute.getAllEgressRoutes()))\r
-                       BaseServlet.provisioningDataChanged();\r
-       }\r
-       private void syncNetworkRoutes(JSONArray ja) {\r
-               Collection<Syncable> coll = new ArrayList<Syncable>();\r
-               for (int n = 0; n < ja.length(); n++) {\r
-                       try {\r
-                               NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));\r
-                               coll.add(nr);\r
-                       } catch (JSONException e) {\r
-                               logger.warn("PROV5004: Invalid object in network routes: "+ja.optJSONObject(n));\r
-                       }\r
-               }\r
-               if (sync(coll, NetworkRoute.getAllNetworkRoutes()))\r
-                       BaseServlet.provisioningDataChanged();\r
-       }\r
-       private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {\r
-               boolean changes = false;\r
-               try {\r
-                       Map<String, Syncable> newmap = getMap(newc);\r
-                       Map<String, Syncable> oldmap = getMap(oldc);\r
-                       Set<String> union = new TreeSet<String>(newmap.keySet());\r
-                       union.addAll(oldmap.keySet());\r
-                       DB db = new DB();\r
-                       @SuppressWarnings("resource")\r
-                       Connection conn = db.getConnection();\r
-                       for (String n : union) {\r
-                               Syncable newobj = newmap.get(n);\r
-                               Syncable oldobj = oldmap.get(n);\r
-                               if (oldobj == null) {\r
-                                       if (logger.isDebugEnabled())\r
-                                               logger.debug("  Inserting record: "+newobj);\r
-                                       newobj.doInsert(conn);\r
-                                       changes = true;\r
-                               } else if (newobj == null) {\r
-                                       if (logger.isDebugEnabled())\r
-                                               logger.debug("  Deleting record: "+oldobj);\r
-                                       oldobj.doDelete(conn);\r
-                                       changes = true;\r
-                               } else if (!newobj.equals(oldobj)) {\r
-                                       if (logger.isDebugEnabled())\r
-                                               logger.debug("  Updating record: "+newobj);\r
-                                       newobj.doUpdate(conn);\r
-\r
-                                       /**Rally US708115\r
-                                        * Change Ownership of FEED - 1610, Syncronised with secondary DB.\r
-                                        * */\r
-                                       checkChnageOwner(newobj, oldobj);\r
-\r
-                                       changes = true;\r
-                               }\r
-                       }\r
-                       db.release(conn);\r
-               } catch (SQLException e) {\r
-                       logger.warn("PROV5009: problem during sync, exception: "+e);\r
-                       e.printStackTrace();\r
-               }\r
-               return changes;\r
-       }\r
-       private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {\r
-               Map<String, Syncable> map = new HashMap<String, Syncable>();\r
-               for (Syncable v : c) {\r
-                       map.put(v.getKey(), v);\r
-               }\r
-               return map;\r
-       }\r
-       \r
-\r
-       /**Change owner of FEED/SUBSCRIPTION*/\r
-       /**Rally US708115\r
-        * Change Ownership of FEED - 1610\r
-        * \r
-        * */\r
-       private void checkChnageOwner(Syncable newobj, Syncable oldobj) {\r
-               if(newobj instanceof Feed) {\r
-                       Feed oldfeed = (Feed) oldobj;\r
-                       Feed newfeed = (Feed) newobj;\r
-                       \r
-                       if(!oldfeed.getPublisher().equals(newfeed.getPublisher())){\r
-                               logger.info("PROV5013 -  Previous publisher: "+oldfeed.getPublisher() +": New publisher-"+newfeed.getPublisher());\r
-                               oldfeed.setPublisher(newfeed.getPublisher());\r
-                               oldfeed.changeOwnerShip();\r
-                       }\r
-               }\r
-               else if(newobj instanceof Subscription) {\r
-                       Subscription oldsub = (Subscription) oldobj;\r
-                       Subscription newsub = (Subscription) newobj;\r
-                       \r
-                       if(!oldsub.getSubscriber().equals(newsub.getSubscriber())){\r
-                               logger.info("PROV5013 -  Previous subscriber: "+oldsub.getSubscriber() +": New subscriber-"+newsub.getSubscriber());\r
-                               oldsub.setSubscriber(newsub.getSubscriber());\r
-                               oldsub.changeOwnerShip();\r
-                       }\r
-               }\r
-               \r
-       }\r
-\r
-       /**\r
-        * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.\r
-        * @return the provisioning data (as a JONObject)\r
-        */\r
-       private synchronized JSONObject readProvisioningJSON() {\r
-               String url  = URLUtilities.generatePeerProvURL();\r
-               HttpGet get = new HttpGet(url);\r
-               try {\r
-                       HttpResponse response = httpclient.execute(get);\r
-                       int code = response.getStatusLine().getStatusCode();\r
-                       if (code != HttpServletResponse.SC_OK) {\r
-                               logger.warn("PROV5010: readProvisioningJSON failed, bad error code: "+code);\r
-                               return null;\r
-                       }\r
-                       HttpEntity entity = response.getEntity();\r
-                       String ctype = entity.getContentType().getValue().trim();\r
-                       if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {\r
-                               logger.warn("PROV5011: readProvisioningJSON failed, bad content type: "+ctype);\r
-                               return null;\r
-                       }\r
-                       return new JSONObject(new JSONTokener(entity.getContent()));\r
-               } catch (Exception e) {\r
-                       logger.warn("PROV5012: readProvisioningJSON failed, exception: "+e);\r
-                       return null;\r
-               } finally {\r
-                       get.releaseConnection();\r
-               }\r
-       }\r
-       /**\r
-        * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the\r
-        * log records available in the remote database.\r
-        * @return the bitset\r
-        */\r
-       private RLEBitSet readRemoteLoglist() {\r
-               RLEBitSet bs = new RLEBitSet();\r
-               String url  = URLUtilities.generatePeerLogsURL();\r
-\r
-               //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.\r
-               if(url.equals("")) {\r
-                       return bs;\r
-               }\r
-               //End of fix.\r
-\r
-               HttpGet get = new HttpGet(url);\r
-               try {\r
-                       HttpResponse response = httpclient.execute(get);\r
-                       int code = response.getStatusLine().getStatusCode();\r
-                       if (code != HttpServletResponse.SC_OK) {\r
-                               logger.warn("PROV5010: readRemoteLoglist failed, bad error code: "+code);\r
-                               return bs;\r
-                       }\r
-                       HttpEntity entity = response.getEntity();\r
-                       String ctype = entity.getContentType().getValue().trim();\r
-                       if (!ctype.equals("text/plain")) {\r
-                               logger.warn("PROV5011: readRemoteLoglist failed, bad content type: "+ctype);\r
-                               return bs;\r
-                       }\r
-                       InputStream is = entity.getContent();\r
-                       ByteArrayOutputStream bos = new ByteArrayOutputStream();\r
-                       int ch = 0;\r
-                       while ((ch = is.read()) >= 0)\r
-                               bos.write(ch);\r
-                       bs.set(bos.toString());\r
-                       is.close();\r
-               } catch (Exception e) {\r
-                       logger.warn("PROV5012: readRemoteLoglist failed, exception: "+e);\r
-                       return bs;\r
-               } finally {\r
-                       get.releaseConnection();\r
-               }\r
-               return bs;\r
-       }\r
-       /**\r
-        * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available\r
-        * in the remote database that we wish to copy to the local database.\r
-        * @param bs the bitset (an RELBitSet) of log records to fetch\r
-        */\r
-       private void replicateDRLogs(RLEBitSet bs) {\r
-               String url  = URLUtilities.generatePeerLogsURL();\r
-               HttpPost post = new HttpPost(url);\r
-               try {\r
-                       String t = bs.toString();\r
-                       HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));\r
-                       post.setEntity(body);\r
-                       if (logger.isDebugEnabled())\r
-                               logger.debug("Requesting records: "+t);\r
-\r
-                       HttpResponse response = httpclient.execute(post);\r
-                       int code = response.getStatusLine().getStatusCode();\r
-                       if (code != HttpServletResponse.SC_OK) {\r
-                               logger.warn("PROV5010: replicateDRLogs failed, bad error code: "+code);\r
-                               return;\r
-                       }\r
-                       HttpEntity entity = response.getEntity();\r
-                       String ctype = entity.getContentType().getValue().trim();\r
-                       if (!ctype.equals("text/plain")) {\r
-                               logger.warn("PROV5011: replicateDRLogs failed, bad content type: "+ctype);\r
-                               return;\r
-                       }\r
-\r
-                       String spoolname = "" + System.currentTimeMillis();\r
-                       Path tmppath = Paths.get(spooldir, spoolname);\r
-                       Path donepath = Paths.get(spooldir, "IN."+spoolname);\r
-                       Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);\r
-                       Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);\r
-                       logger.info("Approximately "+bs.cardinality()+" records replicated.");\r
-               } catch (Exception e) {\r
-                       logger.warn("PROV5012: replicateDRLogs failed, exception: "+e);\r
-               } finally {\r
-                       post.releaseConnection();\r
-               }\r
-       }\r
-}\r
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+package org.onap.dmaap.datarouter.provisioning;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.security.KeyStore;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.AbstractHttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
+import org.onap.dmaap.datarouter.provisioning.beans.Feed;
+import org.onap.dmaap.datarouter.provisioning.beans.Group;
+import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
+import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
+import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
+import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
+import org.onap.dmaap.datarouter.provisioning.utils.DB;
+import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
+import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
+import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
+
+/**
+ * This class handles synchronization between provisioning servers (PODs).  It has three primary functions:
+ * <ol>
+ * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
+ * the active (master) POD.</li>
+ * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
+ * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
+ * of this POD.</li>
+ * </ol>
+ * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
+ * <code>
+ *         Security.setProperty("networkaddress.cache.ttl", "10");
+ * </code>
+ *
+ * @author Robert Eby
+ * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
+ */
+public class SynchronizerTask extends TimerTask {
+    /** This is a singleton -- there is only one SynchronizerTask object in the server */
+    private static SynchronizerTask synctask;
+
+    /** This POD is unknown -- not on the list of PODs */
+    public static final int UNKNOWN = 0;
+    /** This POD is active -- on the list of PODs, and the DNS CNAME points to us */
+    public static final int ACTIVE = 1;
+    /** This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us */
+    public static final int STANDBY = 2;
+    private static final String[] stnames = { "UNKNOWN", "ACTIVE", "STANDBY" };
+    private static final long ONE_HOUR = 60 * 60 * 1000L;
+
+    private final Logger logger;
+    private final Timer rolex;
+    private final String spooldir;
+    private int state;
+    private boolean doFetch;
+    private long nextsynctime;
+    private AbstractHttpClient httpclient = null;
+
+    /**
+     * Get the singleton SynchronizerTask object.
+     * @return the SynchronizerTask
+     */
+    public static synchronized SynchronizerTask getSynchronizer() {
+        if (synctask == null)
+            synctask = new SynchronizerTask();
+        return synctask;
+    }
+
+    @SuppressWarnings("deprecation")
+    private SynchronizerTask() {
+        logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
+        rolex = new Timer();
+        spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
+        state = UNKNOWN;
+        doFetch = true;        // start off with a fetch
+        nextsynctime = 0;
+
+        logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
+        try {
+            Properties props = (new DB()).getProperties();
+            String type  = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
+            String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
+            String pass  = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY);
+            KeyStore keyStore = KeyStore.getInstance(type);
+            FileInputStream instream = new FileInputStream(new File(store));
+            keyStore.load(instream, pass.toCharArray());
+            instream.close();
+
+            store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
+            pass  = props.getProperty(Main.TRUSTSTORE_PASSWORD_PROPERTY);
+            KeyStore trustStore = null;
+            if (store != null && store.length() > 0) {
+                trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+                instream = new FileInputStream(new File(store));
+                trustStore.load(instream, pass.toCharArray());
+                instream.close();
+            }
+
+            // We are connecting with the node name, but the certificate will have the CNAME
+            // So we need to accept a non-matching certificate name
+            String keystorepass  = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
+            AbstractHttpClient hc = new DefaultHttpClient();
+            SSLSocketFactory socketFactory =
+                (trustStore == null)
+                ? new SSLSocketFactory(keyStore, keystorepass)
+                : new SSLSocketFactory(keyStore, keystorepass, trustStore);
+            socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+            Scheme sch = new Scheme("https", 443, socketFactory);
+            hc.getConnectionManager().getSchemeRegistry().register(sch);
+            httpclient = hc;
+
+            // Run once every 5 seconds to check DNS, etc.
+            long interval = 0;
+            try {
+                String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
+                interval = Long.parseLong(s);
+            } catch (NumberFormatException e) {
+                interval = 5000L;
+            }
+            rolex.scheduleAtFixedRate(this, 0L, interval);
+        } catch (Exception e) {
+            logger.warn("PROV5005: Problem starting the synchronizer: "+e);
+        }
+    }
+
+    /**
+     * What is the state of this POD?
+     * @return one of ACTIVE, STANDBY, UNKNOWN
+     */
+    public int getState() {
+        return state;
+    }
+
+    /**
+     * Is this the active POD?
+     * @return true if we are active (the master), false otherwise
+     */
+    public boolean isActive() {
+        return state == ACTIVE;
+    }
+
+    /**
+     * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request,
+     * and that we should re-synchronize with the master.
+     */
+    public void doFetch() {
+        doFetch = true;
+    }
+
+    /**
+     * Runs once a minute in order to <ol>
+     * <li>lookup DNS names,</li>
+     * <li>determine the state of this POD,</li>
+     * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
+     * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
+     * </ol>
+     */
+    @Override
+    public void run() {
+        try {
+            state = lookupState();
+            if (state == STANDBY) {
+                // Only copy provisioning data FROM the active server TO the standby
+                if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
+                    logger.debug("Initiating a sync...");
+                    JSONObject jo = readProvisioningJSON();
+                    if (jo != null) {
+                        doFetch = false;
+                        syncFeeds( jo.getJSONArray("feeds"));
+                        syncSubs(  jo.getJSONArray("subscriptions"));
+                        syncGroups(  jo.getJSONArray("groups")); //Rally:US708115 - 1610
+                        syncParams(jo.getJSONObject("parameters"));
+                        // The following will not be present in a version=1.0 provfeed
+                        JSONArray ja = jo.optJSONArray("ingress");
+                        if (ja != null)
+                            syncIngressRoutes(ja);
+                        JSONObject j2 = jo.optJSONObject("egress");
+                        if (j2 != null)
+                            syncEgressRoutes( j2);
+                        ja = jo.optJSONArray("routing");
+                        if (ja != null)
+                            syncNetworkRoutes(ja);
+                    }
+                    logger.info("PROV5013: Sync completed.");
+                    nextsynctime = System.currentTimeMillis() + ONE_HOUR;
+                }
+            } else {
+                // Don't do fetches on non-standby PODs
+                doFetch = false;
+            }
+
+            // Fetch DR logs as needed - server to server
+            LogfileLoader lfl = LogfileLoader.getLoader();
+            if (lfl.isIdle()) {
+                // Only fetch new logs if the loader is waiting for them.
+                logger.trace("Checking for logs to replicate...");
+                RLEBitSet local  = lfl.getBitSet();
+                RLEBitSet remote = readRemoteLoglist();
+                remote.andNot(local);
+                if (!remote.isEmpty()) {
+                    logger.debug(" Replicating logs: "+remote);
+                    replicateDRLogs(remote);
+                }
+            }
+        } catch (Exception e) {
+            logger.warn("PROV0020: Caught exception in SynchronizerTask: "+e);
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * This method is used to lookup the CNAME that points to the active server.
+     * It returns 0 (UNKNOWN), 1(ACTIVE), or 2 (STANDBY) to indicate the state of this server.
+     * @return the current state
+     */
+    private int lookupState() {
+        int newstate = UNKNOWN;
+        try {
+            InetAddress myaddr = InetAddress.getLocalHost();
+            if (logger.isTraceEnabled())
+                logger.trace("My address: "+myaddr);
+            String this_pod = myaddr.getHostName();
+            Set<String> pods = new TreeSet<String>(Arrays.asList(BaseServlet.getPods()));
+            if (pods.contains(this_pod)) {
+                InetAddress pserver = InetAddress.getByName(BaseServlet.active_prov_name);
+                newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
+                if (logger.isDebugEnabled() && System.currentTimeMillis() >= next_msg) {
+                    logger.debug("Active POD = "+pserver+", Current state is "+stnames[newstate]);
+                    next_msg = System.currentTimeMillis() + (5 * 60 * 1000L);
+                }
+            } else {
+                logger.warn("PROV5003: My name ("+this_pod+") is missing from the list of provisioning servers.");
+            }
+        } catch (UnknownHostException e) {
+            logger.warn("PROV5002: Cannot determine the name of this provisioning server.");
+        }
+
+        if (newstate != state)
+            logger.info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
+        return newstate;
+    }
+    private static long next_msg = 0;    // only display the "Current state" msg every 5 mins.
+    /** Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */
+    private void syncFeeds(JSONArray ja) {
+        Collection<Syncable> coll = new ArrayList<Syncable>();
+        for (int n = 0; n < ja.length(); n++) {
+            try {
+                Feed f = new Feed(ja.getJSONObject(n));
+                coll.add(f);
+            } catch (Exception e) {
+                logger.warn("PROV5004: Invalid object in feed: "+ja.optJSONObject(n));
+            }
+        }
+        if (sync(coll, Feed.getAllFeeds()))
+            BaseServlet.provisioningDataChanged();
+    }
+    /** Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. */
+    private void syncSubs(JSONArray ja) {
+        Collection<Syncable> coll = new ArrayList<Syncable>();
+        for (int n = 0; n < ja.length(); n++) {
+            try {
+                //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
+                JSONObject j = ja.getJSONObject(n);
+                j.put("sync", "true");
+                Subscription s = new Subscription(j);
+                coll.add(s);
+            } catch (Exception e) {
+                logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
+            }
+        }
+        if (sync(coll, Subscription.getAllSubscriptions()))
+            BaseServlet.provisioningDataChanged();
+    }
+
+    /**  Rally:US708115  - Synchronize the Groups in the JSONArray, with the Groups in the DB. */
+    private void syncGroups(JSONArray ja) {
+        Collection<Syncable> coll = new ArrayList<Syncable>();
+        for (int n = 0; n < ja.length(); n++) {
+            try {
+                Group g = new Group(ja.getJSONObject(n));
+                coll.add(g);
+            } catch (Exception e) {
+                logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
+            }
+        }
+        if (sync(coll, Group.getAllgroups()))
+            BaseServlet.provisioningDataChanged();
+    }
+
+
+    /** Synchronize the Parameters in the JSONObject, with the Parameters in the DB. */
+    private void syncParams(JSONObject jo) {
+        Collection<Syncable> coll = new ArrayList<Syncable>();
+        for (String k : jo.keySet()) {
+            String v = "";
+            try {
+                v = jo.getString(k);
+            } catch (JSONException e) {
+                try {
+                    v = ""+jo.getInt(k);
+                } catch (JSONException e1) {
+                    JSONArray ja = jo.getJSONArray(k);
+                    for (int i = 0; i < ja.length(); i++) {
+                        if (i > 0)
+                            v += "|";
+                        v += ja.getString(i);
+                    }
+                }
+            }
+            coll.add(new Parameters(k, v));
+        }
+        if (sync(coll, Parameters.getParameterCollection())) {
+            BaseServlet.provisioningDataChanged();
+            BaseServlet.provisioningParametersChanged();
+        }
+    }
+    private void syncIngressRoutes(JSONArray ja) {
+        Collection<Syncable> coll = new ArrayList<Syncable>();
+        for (int n = 0; n < ja.length(); n++) {
+            try {
+                IngressRoute in = new IngressRoute(ja.getJSONObject(n));
+                coll.add(in);
+            } catch (NumberFormatException e) {
+                logger.warn("PROV5004: Invalid object in ingress routes: "+ja.optJSONObject(n));
+            }
+        }
+        if (sync(coll, IngressRoute.getAllIngressRoutes()))
+            BaseServlet.provisioningDataChanged();
+    }
+    private void syncEgressRoutes(JSONObject jo) {
+        Collection<Syncable> coll = new ArrayList<Syncable>();
+        for (String key : jo.keySet()) {
+            try {
+                int sub = Integer.parseInt(key);
+                String node = jo.getString(key);
+                EgressRoute er = new EgressRoute(sub, node);
+                coll.add(er);
+            } catch (NumberFormatException e) {
+                logger.warn("PROV5004: Invalid subid in egress routes: "+key);
+            } catch (IllegalArgumentException e) {
+                logger.warn("PROV5004: Invalid node name in egress routes: "+key);
+            }
+        }
+        if (sync(coll, EgressRoute.getAllEgressRoutes()))
+            BaseServlet.provisioningDataChanged();
+    }
+    private void syncNetworkRoutes(JSONArray ja) {
+        Collection<Syncable> coll = new ArrayList<Syncable>();
+        for (int n = 0; n < ja.length(); n++) {
+            try {
+                NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
+                coll.add(nr);
+            } catch (JSONException e) {
+                logger.warn("PROV5004: Invalid object in network routes: "+ja.optJSONObject(n));
+            }
+        }
+        if (sync(coll, NetworkRoute.getAllNetworkRoutes()))
+            BaseServlet.provisioningDataChanged();
+    }
+    private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
+        boolean changes = false;
+        try {
+            Map<String, Syncable> newmap = getMap(newc);
+            Map<String, Syncable> oldmap = getMap(oldc);
+            Set<String> union = new TreeSet<String>(newmap.keySet());
+            union.addAll(oldmap.keySet());
+            DB db = new DB();
+            @SuppressWarnings("resource")
+            Connection conn = db.getConnection();
+            for (String n : union) {
+                Syncable newobj = newmap.get(n);
+                Syncable oldobj = oldmap.get(n);
+                if (oldobj == null) {
+                    if (logger.isDebugEnabled())
+                        logger.debug("  Inserting record: "+newobj);
+                    newobj.doInsert(conn);
+                    changes = true;
+                } else if (newobj == null) {
+                    if (logger.isDebugEnabled())
+                        logger.debug("  Deleting record: "+oldobj);
+                    oldobj.doDelete(conn);
+                    changes = true;
+                } else if (!newobj.equals(oldobj)) {
+                    if (logger.isDebugEnabled())
+                        logger.debug("  Updating record: "+newobj);
+                    newobj.doUpdate(conn);
+
+                    /**Rally US708115
+                     * Change Ownership of FEED - 1610, Syncronised with secondary DB.
+                     * */
+                    checkChnageOwner(newobj, oldobj);
+
+                    changes = true;
+                }
+            }
+            db.release(conn);
+        } catch (SQLException e) {
+            logger.warn("PROV5009: problem during sync, exception: "+e);
+            e.printStackTrace();
+        }
+        return changes;
+    }
+    private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
+        Map<String, Syncable> map = new HashMap<String, Syncable>();
+        for (Syncable v : c) {
+            map.put(v.getKey(), v);
+        }
+        return map;
+    }
+
+
+    /**Change owner of FEED/SUBSCRIPTION*/
+    /**Rally US708115
+     * Change Ownership of FEED - 1610
+     *
+     * */
+    private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
+        if(newobj instanceof Feed) {
+            Feed oldfeed = (Feed) oldobj;
+            Feed newfeed = (Feed) newobj;
+
+            if(!oldfeed.getPublisher().equals(newfeed.getPublisher())){
+                logger.info("PROV5013 -  Previous publisher: "+oldfeed.getPublisher() +": New publisher-"+newfeed.getPublisher());
+                oldfeed.setPublisher(newfeed.getPublisher());
+                oldfeed.changeOwnerShip();
+            }
+        }
+        else if(newobj instanceof Subscription) {
+            Subscription oldsub = (Subscription) oldobj;
+            Subscription newsub = (Subscription) newobj;
+
+            if(!oldsub.getSubscriber().equals(newsub.getSubscriber())){
+                logger.info("PROV5013 -  Previous subscriber: "+oldsub.getSubscriber() +": New subscriber-"+newsub.getSubscriber());
+                oldsub.setSubscriber(newsub.getSubscriber());
+                oldsub.changeOwnerShip();
+            }
+        }
+
+    }
+
+    /**
+     * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
+     * @return the provisioning data (as a JONObject)
+     */
+    private synchronized JSONObject readProvisioningJSON() {
+        String url  = URLUtilities.generatePeerProvURL();
+        HttpGet get = new HttpGet(url);
+        try {
+            HttpResponse response = httpclient.execute(get);
+            int code = response.getStatusLine().getStatusCode();
+            if (code != HttpServletResponse.SC_OK) {
+                logger.warn("PROV5010: readProvisioningJSON failed, bad error code: "+code);
+                return null;
+            }
+            HttpEntity entity = response.getEntity();
+            String ctype = entity.getContentType().getValue().trim();
+            if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
+                logger.warn("PROV5011: readProvisioningJSON failed, bad content type: "+ctype);
+                return null;
+            }
+            return new JSONObject(new JSONTokener(entity.getContent()));
+        } catch (Exception e) {
+            logger.warn("PROV5012: readProvisioningJSON failed, exception: "+e);
+            return null;
+        } finally {
+            get.releaseConnection();
+        }
+    }
+    /**
+     * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the
+     * log records available in the remote database.
+     * @return the bitset
+     */
+    private RLEBitSet readRemoteLoglist() {
+        RLEBitSet bs = new RLEBitSet();
+        String url  = URLUtilities.generatePeerLogsURL();
+
+        //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
+        if(url.equals("")) {
+            return bs;
+        }
+        //End of fix.
+
+        HttpGet get = new HttpGet(url);
+        try {
+            HttpResponse response = httpclient.execute(get);
+            int code = response.getStatusLine().getStatusCode();
+            if (code != HttpServletResponse.SC_OK) {
+                logger.warn("PROV5010: readRemoteLoglist failed, bad error code: "+code);
+                return bs;
+            }
+            HttpEntity entity = response.getEntity();
+            String ctype = entity.getContentType().getValue().trim();
+            if (!ctype.equals("text/plain")) {
+                logger.warn("PROV5011: readRemoteLoglist failed, bad content type: "+ctype);
+                return bs;
+            }
+            InputStream is = entity.getContent();
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            int ch = 0;
+            while ((ch = is.read()) >= 0)
+                bos.write(ch);
+            bs.set(bos.toString());
+            is.close();
+        } catch (Exception e) {
+            logger.warn("PROV5012: readRemoteLoglist failed, exception: "+e);
+            return bs;
+        } finally {
+            get.releaseConnection();
+        }
+        return bs;
+    }
+    /**
+     * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available
+     * in the remote database that we wish to copy to the local database.
+     * @param bs the bitset (an RELBitSet) of log records to fetch
+     */
+    private void replicateDRLogs(RLEBitSet bs) {
+        String url  = URLUtilities.generatePeerLogsURL();
+        HttpPost post = new HttpPost(url);
+        try {
+            String t = bs.toString();
+            HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
+            post.setEntity(body);
+            if (logger.isDebugEnabled())
+                logger.debug("Requesting records: "+t);
+
+            HttpResponse response = httpclient.execute(post);
+            int code = response.getStatusLine().getStatusCode();
+            if (code != HttpServletResponse.SC_OK) {
+                logger.warn("PROV5010: replicateDRLogs failed, bad error code: "+code);
+                return;
+            }
+            HttpEntity entity = response.getEntity();
+            String ctype = entity.getContentType().getValue().trim();
+            if (!ctype.equals("text/plain")) {
+                logger.warn("PROV5011: replicateDRLogs failed, bad content type: "+ctype);
+                return;
+            }
+
+            String spoolname = "" + System.currentTimeMillis();
+            Path tmppath = Paths.get(spooldir, spoolname);
+            Path donepath = Paths.get(spooldir, "IN."+spoolname);
+            Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
+            Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
+            logger.info("Approximately "+bs.cardinality()+" records replicated.");
+        } catch (Exception e) {
+            logger.warn("PROV5012: replicateDRLogs failed, exception: "+e);
+        } finally {
+            post.releaseConnection();
+        }
+    }
+}