package org.onap.dmaap.datarouter.provisioning;
+import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.AbstractHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
* <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
* the active (master) POD.</li>
* <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
- * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
+ * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
* of this POD.</li>
* </ol>
* <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
* <code>
- * Security.setProperty("networkaddress.cache.ttl", "10");
+ * Security.setProperty("networkaddress.cache.ttl", "10");
* </code>
*
* @author Robert Eby
* @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
*/
+
public class SynchronizerTask extends TimerTask {
- /** This is a singleton -- there is only one SynchronizerTask object in the server */
+
+ /**
+ * This is a singleton -- there is only one SynchronizerTask object in the server.
+ */
private static SynchronizerTask synctask;
- /** This POD is unknown -- not on the list of PODs */
- public static final int UNKNOWN = 0;
- /** This POD is active -- on the list of PODs, and the DNS CNAME points to us */
- public static final int ACTIVE = 1;
- /** This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us */
- public static final int STANDBY = 2;
- private static final String[] stnames = { "UNKNOWN", "ACTIVE", "STANDBY" };
+ /**
+ * This POD is unknown -- not on the list of PODs.
+ */
+ public static final int UNKNOWN_POD = 0;
+ /**
+ * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
+ */
+ public static final int ACTIVE_POD = 1;
+ /**
+ * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
+ */
+ public static final int STANDBY_POD = 2;
+
+ private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
private static final long ONE_HOUR = 60 * 60 * 1000L;
- private final Logger logger;
+ private long nextMsg = 0; // only display the "Current podState" msg every 5 mins.
+
+ private final EELFLogger logger;
private final Timer rolex;
private final String spooldir;
- private int state;
+ private int podState;
private boolean doFetch;
private long nextsynctime;
private AbstractHttpClient httpclient = null;
- /**
- * Get the singleton SynchronizerTask object.
- * @return the SynchronizerTask
- */
- public static synchronized SynchronizerTask getSynchronizer() {
- if (synctask == null)
- synctask = new SynchronizerTask();
- return synctask;
- }
-
@SuppressWarnings("deprecation")
private SynchronizerTask() {
- logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
+ logger = EELFManager.getInstance().getLogger("InternalLog");
rolex = new Timer();
spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
- state = UNKNOWN;
+ podState = UNKNOWN_POD;
doFetch = true; // start off with a fetch
nextsynctime = 0;
- logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
+ logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
try {
Properties props = (new DB()).getProperties();
- String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
+ String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
- String pass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY);
+ String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
KeyStore keyStore = KeyStore.getInstance(type);
- FileInputStream instream = new FileInputStream(new File(store));
- keyStore.load(instream, pass.toCharArray());
- instream.close();
+ try (FileInputStream instream = new FileInputStream(new File(store))) {
+ keyStore.load(instream, pass.toCharArray());
+ }
store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
- pass = props.getProperty(Main.TRUSTSTORE_PASSWORD_PROPERTY);
+ pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
KeyStore trustStore = null;
if (store != null && store.length() > 0) {
trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
- instream = new FileInputStream(new File(store));
- trustStore.load(instream, pass.toCharArray());
- instream.close();
+ try (FileInputStream instream = new FileInputStream(new File(store))) {
+ trustStore.load(instream, pass.toCharArray());
+
+ }
}
// We are connecting with the node name, but the certificate will have the CNAME
// So we need to accept a non-matching certificate name
- String keystorepass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
- AbstractHttpClient hc = new DefaultHttpClient();
- SSLSocketFactory socketFactory =
- (trustStore == null)
- ? new SSLSocketFactory(keyStore, keystorepass)
- : new SSLSocketFactory(keyStore, keystorepass, trustStore);
- socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
- Scheme sch = new Scheme("https", 443, socketFactory);
- hc.getConnectionManager().getSchemeRegistry().register(sch);
- httpclient = hc;
-
- // Run once every 5 seconds to check DNS, etc.
- long interval = 0;
- try {
- String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
- interval = Long.parseLong(s);
- } catch (NumberFormatException e) {
- interval = 5000L;
+ String keystorepass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
+ try (AbstractHttpClient hc = new DefaultHttpClient()) {
+ SSLSocketFactory socketFactory =
+ (trustStore == null)
+ ? new SSLSocketFactory(keyStore, keystorepass)
+ : new SSLSocketFactory(keyStore, keystorepass, trustStore);
+ socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+ Scheme sch = new Scheme("https", 443, socketFactory);
+ hc.getConnectionManager().getSchemeRegistry().register(sch);
+ httpclient = hc;
}
- rolex.scheduleAtFixedRate(this, 0L, interval);
+ setSynchTimer(props);
} catch (Exception e) {
- logger.warn("PROV5005: Problem starting the synchronizer: "+e);
+ logger.warn("PROV5005: Problem starting the synchronizer: " + e);
}
}
+ private void setSynchTimer(Properties props) {
+ // Run once every 5 seconds to check DNS, etc.
+ long interval;
+ try {
+ String str = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
+ interval = Long.parseLong(str);
+ } catch (NumberFormatException e) {
+ interval = 5000L;
+ }
+ rolex.scheduleAtFixedRate(this, 0L, interval);
+ }
+
/**
- * What is the state of this POD?
- * @return one of ACTIVE, STANDBY, UNKNOWN
+ * Get the singleton SynchronizerTask object.
+ *
+ * @return the SynchronizerTask
*/
- public int getState() {
- return state;
+ public static synchronized SynchronizerTask getSynchronizer() {
+ if (synctask == null) {
+ synctask = new SynchronizerTask();
+ }
+ return synctask;
}
/**
- * Is this the active POD?
+ * What is the podState of this POD?.
+ *
+ * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
+ */
+ public int getPodState() {
+ return podState;
+ }
+
+ /**
+ * Is this the active POD?.
+ *
* @return true if we are active (the master), false otherwise
*/
public boolean isActive() {
- return state == ACTIVE;
+ return podState == ACTIVE_POD;
}
/**
- * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request,
- * and that we should re-synchronize with the master.
+ * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
+ * should re-synchronize with the master.
*/
- public void doFetch() {
+ void doFetch() {
doFetch = true;
}
/**
* Runs once a minute in order to <ol>
* <li>lookup DNS names,</li>
- * <li>determine the state of this POD,</li>
- * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
+ * <li>determine the podState of this POD,</li>
+ * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
* <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
- * </ol>
+ * </ol>.
*/
@Override
public void run() {
try {
- state = lookupState();
- if (state == STANDBY) {
+ podState = lookupState();
+ if (podState == STANDBY_POD) {
// Only copy provisioning data FROM the active server TO the standby
if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
- logger.debug("Initiating a sync...");
- JSONObject jo = readProvisioningJSON();
- if (jo != null) {
- doFetch = false;
- syncFeeds( jo.getJSONArray("feeds"));
- syncSubs( jo.getJSONArray("subscriptions"));
- syncGroups( jo.getJSONArray("groups")); //Rally:US708115 - 1610
- syncParams(jo.getJSONObject("parameters"));
- // The following will not be present in a version=1.0 provfeed
- JSONArray ja = jo.optJSONArray("ingress");
- if (ja != null)
- syncIngressRoutes(ja);
- JSONObject j2 = jo.optJSONObject("egress");
- if (j2 != null)
- syncEgressRoutes( j2);
- ja = jo.optJSONArray("routing");
- if (ja != null)
- syncNetworkRoutes(ja);
- }
+ syncProvisioningData();
logger.info("PROV5013: Sync completed.");
nextsynctime = System.currentTimeMillis() + ONE_HOUR;
}
if (lfl.isIdle()) {
// Only fetch new logs if the loader is waiting for them.
logger.trace("Checking for logs to replicate...");
- RLEBitSet local = lfl.getBitSet();
+ RLEBitSet local = lfl.getBitSet();
RLEBitSet remote = readRemoteLoglist();
remote.andNot(local);
if (!remote.isEmpty()) {
- logger.debug(" Replicating logs: "+remote);
- replicateDRLogs(remote);
+ logger.debug(" Replicating logs: " + remote);
+ replicateDataRouterLogs(remote);
}
}
} catch (Exception e) {
- logger.warn("PROV0020: Caught exception in SynchronizerTask: "+e);
- e.printStackTrace();
+ logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
+ }
+ }
+
+ private void syncProvisioningData() {
+ logger.debug("Initiating a sync...");
+ JSONObject jo = readProvisioningJson();
+ if (jo != null) {
+ doFetch = false;
+ syncFeeds(jo.getJSONArray("feeds"));
+ syncSubs(jo.getJSONArray("subscriptions"));
+ syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
+ syncParams(jo.getJSONObject("parameters"));
+ // The following will not be present in a version=1.0 provfeed
+ JSONArray ja = jo.optJSONArray("ingress");
+ if (ja != null) {
+ syncIngressRoutes(ja);
+ }
+ JSONObject j2 = jo.optJSONObject("egress");
+ if (j2 != null) {
+ syncEgressRoutes(j2);
+ }
+ ja = jo.optJSONArray("routing");
+ if (ja != null) {
+ syncNetworkRoutes(ja);
+ }
}
}
/**
* This method is used to lookup the CNAME that points to the active server.
- * It returns 0 (UNKNOWN), 1(ACTIVE), or 2 (STANDBY) to indicate the state of this server.
- * @return the current state
+ * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
+ *
+ * @return the current podState
*/
- private int lookupState() {
- int newstate = UNKNOWN;
+ int lookupState() {
+ int newPodState = UNKNOWN_POD;
try {
InetAddress myaddr = InetAddress.getLocalHost();
- if (logger.isTraceEnabled())
- logger.trace("My address: "+myaddr);
- String this_pod = myaddr.getHostName();
- Set<String> pods = new TreeSet<String>(Arrays.asList(BaseServlet.getPods()));
- if (pods.contains(this_pod)) {
- InetAddress pserver = InetAddress.getByName(BaseServlet.active_prov_name);
- newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
- if (logger.isDebugEnabled() && System.currentTimeMillis() >= next_msg) {
- logger.debug("Active POD = "+pserver+", Current state is "+stnames[newstate]);
- next_msg = System.currentTimeMillis() + (5 * 60 * 1000L);
+ if (logger.isTraceEnabled()) {
+ logger.trace("My address: " + myaddr);
+ }
+ String thisPod = myaddr.getHostName();
+ Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
+ if (pods.contains(thisPod)) {
+ InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
+ newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
+ if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
+ logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
+ nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
}
} else {
- logger.warn("PROV5003: My name ("+this_pod+") is missing from the list of provisioning servers.");
+ logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
}
} catch (UnknownHostException e) {
- logger.warn("PROV5002: Cannot determine the name of this provisioning server.");
+ logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
}
- if (newstate != state)
- logger.info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
- return newstate;
+ if (newPodState != podState) {
+ logger.info(String.format("PROV5001: Server podState changed from %s to %s",
+ stnames[podState], stnames[newPodState]));
+ }
+ return newPodState;
}
- private static long next_msg = 0; // only display the "Current state" msg every 5 mins.
- /** Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */
+
+ /**
+ * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
+ */
private void syncFeeds(JSONArray ja) {
- Collection<Syncable> coll = new ArrayList<Syncable>();
+ Collection<Syncable> coll = new ArrayList<>();
for (int n = 0; n < ja.length(); n++) {
try {
- Feed f = new Feed(ja.getJSONObject(n));
- coll.add(f);
+ Feed feed = new Feed(ja.getJSONObject(n));
+ coll.add(feed);
} catch (Exception e) {
- logger.warn("PROV5004: Invalid object in feed: "+ja.optJSONObject(n));
+ logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
}
}
- if (sync(coll, Feed.getAllFeeds()))
+ if (sync(coll, Feed.getAllFeeds())) {
BaseServlet.provisioningDataChanged();
+ }
}
- /** Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. */
+
+ /**
+ * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
+ */
private void syncSubs(JSONArray ja) {
- Collection<Syncable> coll = new ArrayList<Syncable>();
+ Collection<Syncable> coll = new ArrayList<>();
for (int n = 0; n < ja.length(); n++) {
try {
//Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
- JSONObject j = ja.getJSONObject(n);
- j.put("sync", "true");
- Subscription s = new Subscription(j);
- coll.add(s);
+ JSONObject jsonObject = ja.getJSONObject(n);
+ jsonObject.put("sync", "true");
+ Subscription sub = new Subscription(jsonObject);
+ coll.add(sub);
} catch (Exception e) {
- logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
+ logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
}
}
- if (sync(coll, Subscription.getAllSubscriptions()))
+ if (sync(coll, Subscription.getAllSubscriptions())) {
BaseServlet.provisioningDataChanged();
+ }
}
- /** Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB. */
+ /**
+ * Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB.
+ */
private void syncGroups(JSONArray ja) {
- Collection<Syncable> coll = new ArrayList<Syncable>();
+ Collection<Syncable> coll = new ArrayList<>();
for (int n = 0; n < ja.length(); n++) {
try {
- Group g = new Group(ja.getJSONObject(n));
- coll.add(g);
+ Group group = new Group(ja.getJSONObject(n));
+ coll.add(group);
} catch (Exception e) {
- logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n));
+ logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
}
}
- if (sync(coll, Group.getAllgroups()))
+ if (sync(coll, Group.getAllgroups())) {
BaseServlet.provisioningDataChanged();
+ }
}
- /** Synchronize the Parameters in the JSONObject, with the Parameters in the DB. */
+ /**
+ * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
+ */
private void syncParams(JSONObject jo) {
- Collection<Syncable> coll = new ArrayList<Syncable>();
+ Collection<Syncable> coll = new ArrayList<>();
for (String k : jo.keySet()) {
- String v = "";
+ String val = "";
try {
- v = jo.getString(k);
+ val = jo.getString(k);
} catch (JSONException e) {
+ logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
try {
- v = ""+jo.getInt(k);
+ val = "" + jo.getInt(k);
} catch (JSONException e1) {
+ logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
JSONArray ja = jo.getJSONArray(k);
for (int i = 0; i < ja.length(); i++) {
- if (i > 0)
- v += "|";
- v += ja.getString(i);
+ if (i > 0) {
+ val += "|";
+ }
+ val += ja.getString(i);
}
}
}
- coll.add(new Parameters(k, v));
+ coll.add(new Parameters(k, val));
}
if (sync(coll, Parameters.getParameterCollection())) {
BaseServlet.provisioningDataChanged();
BaseServlet.provisioningParametersChanged();
}
}
+
private void syncIngressRoutes(JSONArray ja) {
- Collection<Syncable> coll = new ArrayList<Syncable>();
+ Collection<Syncable> coll = new ArrayList<>();
for (int n = 0; n < ja.length(); n++) {
try {
IngressRoute in = new IngressRoute(ja.getJSONObject(n));
coll.add(in);
} catch (NumberFormatException e) {
- logger.warn("PROV5004: Invalid object in ingress routes: "+ja.optJSONObject(n));
+ logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
}
}
- if (sync(coll, IngressRoute.getAllIngressRoutes()))
+ if (sync(coll, IngressRoute.getAllIngressRoutes())) {
BaseServlet.provisioningDataChanged();
+ }
}
+
private void syncEgressRoutes(JSONObject jo) {
- Collection<Syncable> coll = new ArrayList<Syncable>();
+ Collection<Syncable> coll = new ArrayList<>();
for (String key : jo.keySet()) {
try {
int sub = Integer.parseInt(key);
EgressRoute er = new EgressRoute(sub, node);
coll.add(er);
} catch (NumberFormatException e) {
- logger.warn("PROV5004: Invalid subid in egress routes: "+key);
+ logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
} catch (IllegalArgumentException e) {
- logger.warn("PROV5004: Invalid node name in egress routes: "+key);
+ logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
}
}
- if (sync(coll, EgressRoute.getAllEgressRoutes()))
+ if (sync(coll, EgressRoute.getAllEgressRoutes())) {
BaseServlet.provisioningDataChanged();
+ }
}
+
private void syncNetworkRoutes(JSONArray ja) {
- Collection<Syncable> coll = new ArrayList<Syncable>();
+ Collection<Syncable> coll = new ArrayList<>();
for (int n = 0; n < ja.length(); n++) {
try {
NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
coll.add(nr);
} catch (JSONException e) {
- logger.warn("PROV5004: Invalid object in network routes: "+ja.optJSONObject(n));
+ logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
}
}
- if (sync(coll, NetworkRoute.getAllNetworkRoutes()))
+ if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
BaseServlet.provisioningDataChanged();
+ }
}
+
private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
boolean changes = false;
try {
Map<String, Syncable> newmap = getMap(newc);
Map<String, Syncable> oldmap = getMap(oldc);
- Set<String> union = new TreeSet<String>(newmap.keySet());
+ Set<String> union = new TreeSet<>(newmap.keySet());
union.addAll(oldmap.keySet());
DB db = new DB();
@SuppressWarnings("resource")
Syncable newobj = newmap.get(n);
Syncable oldobj = oldmap.get(n);
if (oldobj == null) {
- if (logger.isDebugEnabled())
- logger.debug(" Inserting record: "+newobj);
- newobj.doInsert(conn);
- changes = true;
+ changes = insertRecord(conn, newobj);
} else if (newobj == null) {
- if (logger.isDebugEnabled())
- logger.debug(" Deleting record: "+oldobj);
- oldobj.doDelete(conn);
- changes = true;
+ changes = deleteRecord(conn, oldobj);
} else if (!newobj.equals(oldobj)) {
- if (logger.isDebugEnabled())
- logger.debug(" Updating record: "+newobj);
- newobj.doUpdate(conn);
-
- /**Rally US708115
- * Change Ownership of FEED - 1610, Syncronised with secondary DB.
- * */
- checkChnageOwner(newobj, oldobj);
-
- changes = true;
+ changes = updateRecord(conn, newobj, oldobj);
}
}
db.release(conn);
} catch (SQLException e) {
- logger.warn("PROV5009: problem during sync, exception: "+e);
- e.printStackTrace();
+ logger.warn("PROV5009: problem during sync, exception: " + e);
}
return changes;
}
- private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
- Map<String, Syncable> map = new HashMap<String, Syncable>();
- for (Syncable v : c) {
+
+ private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Updating record: " + newobj);
+ }
+ boolean changes = newobj.doUpdate(conn);
+ checkChangeOwner(newobj, oldobj);
+
+ return changes;
+ }
+
+ private boolean deleteRecord(Connection conn, Syncable oldobj) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Deleting record: " + oldobj);
+ }
+ return oldobj.doDelete(conn);
+ }
+
+ private boolean insertRecord(Connection conn, Syncable newobj) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Inserting record: " + newobj);
+ }
+ return newobj.doInsert(conn);
+ }
+
+ private Map<String, Syncable> getMap(Collection<? extends Syncable> coll) {
+ Map<String, Syncable> map = new HashMap<>();
+ for (Syncable v : coll) {
map.put(v.getKey(), v);
}
return map;
}
-
- /**Change owner of FEED/SUBSCRIPTION*/
- /**Rally US708115
- * Change Ownership of FEED - 1610
- *
- * */
- private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
- if(newobj instanceof Feed) {
+ /**
+ * Change owner of FEED/SUBSCRIPTION.
+ * Rally US708115 Change Ownership of FEED - 1610
+ */
+ private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
+ if (newobj instanceof Feed) {
Feed oldfeed = (Feed) oldobj;
Feed newfeed = (Feed) newobj;
- if(!oldfeed.getPublisher().equals(newfeed.getPublisher())){
- logger.info("PROV5013 - Previous publisher: "+oldfeed.getPublisher() +": New publisher-"+newfeed.getPublisher());
+ if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
+ logger.info("PROV5013 - Previous publisher: "
+ + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
oldfeed.setPublisher(newfeed.getPublisher());
oldfeed.changeOwnerShip();
}
- }
- else if(newobj instanceof Subscription) {
+ } else if (newobj instanceof Subscription) {
Subscription oldsub = (Subscription) oldobj;
Subscription newsub = (Subscription) newobj;
- if(!oldsub.getSubscriber().equals(newsub.getSubscriber())){
- logger.info("PROV5013 - Previous subscriber: "+oldsub.getSubscriber() +": New subscriber-"+newsub.getSubscriber());
+ if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
+ logger.info("PROV5013 - Previous subscriber: "
+ + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
oldsub.setSubscriber(newsub.getSubscriber());
oldsub.changeOwnerShip();
}
/**
* Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
+ *
* @return the provisioning data (as a JONObject)
*/
- private synchronized JSONObject readProvisioningJSON() {
- String url = URLUtilities.generatePeerProvURL();
+ private synchronized JSONObject readProvisioningJson() {
+ String url = URLUtilities.generatePeerProvURL();
HttpGet get = new HttpGet(url);
try {
HttpResponse response = httpclient.execute(get);
int code = response.getStatusLine().getStatusCode();
if (code != HttpServletResponse.SC_OK) {
- logger.warn("PROV5010: readProvisioningJSON failed, bad error code: "+code);
+ logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
return null;
}
HttpEntity entity = response.getEntity();
String ctype = entity.getContentType().getValue().trim();
- if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
- logger.warn("PROV5011: readProvisioningJSON failed, bad content type: "+ctype);
+ if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
+ && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
+ logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
return null;
}
return new JSONObject(new JSONTokener(entity.getContent()));
} catch (Exception e) {
- logger.warn("PROV5012: readProvisioningJSON failed, exception: "+e);
+ logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
return null;
} finally {
get.releaseConnection();
}
}
+
/**
- * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the
- * log records available in the remote database.
+ * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
+ * the remote database.
+ *
* @return the bitset
*/
- private RLEBitSet readRemoteLoglist() {
+ RLEBitSet readRemoteLoglist() {
RLEBitSet bs = new RLEBitSet();
- String url = URLUtilities.generatePeerLogsURL();
+ String url = URLUtilities.generatePeerLogsURL();
//Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
- if(url.equals("")) {
+ if ("".equals(url)) {
return bs;
}
//End of fix.
HttpGet get = new HttpGet(url);
- try {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
HttpResponse response = httpclient.execute(get);
int code = response.getStatusLine().getStatusCode();
if (code != HttpServletResponse.SC_OK) {
- logger.warn("PROV5010: readRemoteLoglist failed, bad error code: "+code);
+ logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
return bs;
}
HttpEntity entity = response.getEntity();
String ctype = entity.getContentType().getValue().trim();
- if (!ctype.equals("text/plain")) {
- logger.warn("PROV5011: readRemoteLoglist failed, bad content type: "+ctype);
+ if (!TEXT_CT.equals(ctype)) {
+ logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
return bs;
}
InputStream is = entity.getContent();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- int ch = 0;
- while ((ch = is.read()) >= 0)
+ int ch;
+ while ((ch = is.read()) >= 0) {
bos.write(ch);
+ }
bs.set(bos.toString());
is.close();
} catch (Exception e) {
- logger.warn("PROV5012: readRemoteLoglist failed, exception: "+e);
+ logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
return bs;
} finally {
get.releaseConnection();
}
return bs;
}
+
/**
- * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available
- * in the remote database that we wish to copy to the local database.
+ * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
+ * we wish to copy to the local database.
+ *
* @param bs the bitset (an RELBitSet) of log records to fetch
*/
- private void replicateDRLogs(RLEBitSet bs) {
- String url = URLUtilities.generatePeerLogsURL();
+ void replicateDataRouterLogs(RLEBitSet bs) {
+ String url = URLUtilities.generatePeerLogsURL();
HttpPost post = new HttpPost(url);
try {
- String t = bs.toString();
- HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
+ String str = bs.toString();
+ HttpEntity body = new ByteArrayEntity(str.getBytes(), ContentType.create(TEXT_CT));
post.setEntity(body);
- if (logger.isDebugEnabled())
- logger.debug("Requesting records: "+t);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Requesting records: " + str);
+ }
HttpResponse response = httpclient.execute(post);
int code = response.getStatusLine().getStatusCode();
if (code != HttpServletResponse.SC_OK) {
- logger.warn("PROV5010: replicateDRLogs failed, bad error code: "+code);
+ logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
return;
}
HttpEntity entity = response.getEntity();
String ctype = entity.getContentType().getValue().trim();
- if (!ctype.equals("text/plain")) {
- logger.warn("PROV5011: replicateDRLogs failed, bad content type: "+ctype);
+ if (!TEXT_CT.equals(ctype)) {
+ logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
return;
}
String spoolname = "" + System.currentTimeMillis();
Path tmppath = Paths.get(spooldir, spoolname);
- Path donepath = Paths.get(spooldir, "IN."+spoolname);
+ Path donepath = Paths.get(spooldir, "IN." + spoolname);
Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
- logger.info("Approximately "+bs.cardinality()+" records replicated.");
+ logger.info("Approximately " + bs.cardinality() + " records replicated.");
} catch (Exception e) {
- logger.warn("PROV5012: replicateDRLogs failed, exception: "+e);
+ logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
} finally {
post.releaseConnection();
}