X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fprovisioning%2FSynchronizerTask.java;h=3097a9dbeb13ac0babda93de8f898972b83ba7d6;hb=refs%2Fchanges%2F51%2F78851%2F11;hp=2e0fc21c5bce2d6b5d6791652d737845d49839e2;hpb=bce219cdadbad724b1c6b2704695d8adf11eb65d;p=dmaap%2Fdatarouter.git diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java index 2e0fc21c..3097a9db 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java @@ -1,613 +1,667 @@ -/******************************************************************************* - * ============LICENSE_START================================================== - * * org.onap.dmaap - * * =========================================================================== - * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * * =========================================================================== - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * ============LICENSE_END==================================================== - * * - * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * * - ******************************************************************************/ - - -package org.onap.dmaap.datarouter.provisioning; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; -import java.security.KeyStore; -import java.sql.Connection; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeSet; - -import javax.servlet.http.HttpServletResponse; - -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.conn.scheme.Scheme; -import org.apache.http.conn.ssl.SSLSocketFactory; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.entity.ContentType; -import org.apache.http.impl.client.AbstractHttpClient; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.log4j.Logger; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; -import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute; -import org.onap.dmaap.datarouter.provisioning.beans.Feed; -import org.onap.dmaap.datarouter.provisioning.beans.Group; -import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute; -import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute; -import org.onap.dmaap.datarouter.provisioning.beans.Parameters; -import org.onap.dmaap.datarouter.provisioning.beans.Subscription; -import org.onap.dmaap.datarouter.provisioning.beans.Syncable; -import org.onap.dmaap.datarouter.provisioning.utils.DB; -import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader; -import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet; -import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities; - -/** - * This class handles synchronization between provisioning servers (PODs). It has three primary functions: - *
    - *
  1. Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to - * the active (master) POD.
  2. - *
  3. On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.
  4. - *
  5. Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN) - * of this POD.
  6. - *
- *

For this to work correctly, the following code needs to be placed at the beginning of main().

- * - * Security.setProperty("networkaddress.cache.ttl", "10"); - * - * - * @author Robert Eby - * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $ - */ -public class SynchronizerTask extends TimerTask { - /** This is a singleton -- there is only one SynchronizerTask object in the server */ - private static SynchronizerTask synctask; - - /** This POD is unknown -- not on the list of PODs */ - public static final int UNKNOWN = 0; - /** This POD is active -- on the list of PODs, and the DNS CNAME points to us */ - public static final int ACTIVE = 1; - /** This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us */ - public static final int STANDBY = 2; - private static final String[] stnames = { "UNKNOWN", "ACTIVE", "STANDBY" }; - private static final long ONE_HOUR = 60 * 60 * 1000L; - - private final Logger logger; - private final Timer rolex; - private final String spooldir; - private int state; - private boolean doFetch; - private long nextsynctime; - private AbstractHttpClient httpclient = null; - - /** - * Get the singleton SynchronizerTask object. - * @return the SynchronizerTask - */ - public static synchronized SynchronizerTask getSynchronizer() { - if (synctask == null) - synctask = new SynchronizerTask(); - return synctask; - } - - @SuppressWarnings("deprecation") - private SynchronizerTask() { - logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal"); - rolex = new Timer(); - spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir"); - state = UNKNOWN; - doFetch = true; // start off with a fetch - nextsynctime = 0; - - logger.info("PROV5000: Sync task starting, server state is UNKNOWN"); - try { - Properties props = (new DB()).getProperties(); - String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks"); - String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY); - String pass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); - KeyStore keyStore = KeyStore.getInstance(type); - FileInputStream instream = new FileInputStream(new File(store)); - keyStore.load(instream, pass.toCharArray()); - instream.close(); - - store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY); - pass = props.getProperty(Main.TRUSTSTORE_PASSWORD_PROPERTY); - KeyStore trustStore = null; - if (store != null && store.length() > 0) { - trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - instream = new FileInputStream(new File(store)); - trustStore.load(instream, pass.toCharArray()); - instream.close(); - } - - // We are connecting with the node name, but the certificate will have the CNAME - // So we need to accept a non-matching certificate name - String keystorepass = props.getProperty(Main.KEYSTORE_PASSWORD_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref - AbstractHttpClient hc = new DefaultHttpClient(); - SSLSocketFactory socketFactory = - (trustStore == null) - ? new SSLSocketFactory(keyStore, keystorepass) - : new SSLSocketFactory(keyStore, keystorepass, trustStore); - socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); - Scheme sch = new Scheme("https", 443, socketFactory); - hc.getConnectionManager().getSchemeRegistry().register(sch); - httpclient = hc; - - // Run once every 5 seconds to check DNS, etc. - long interval = 0; - try { - String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000"); - interval = Long.parseLong(s); - } catch (NumberFormatException e) { - interval = 5000L; - } - rolex.scheduleAtFixedRate(this, 0L, interval); - } catch (Exception e) { - logger.warn("PROV5005: Problem starting the synchronizer: "+e); - } - } - - /** - * What is the state of this POD? - * @return one of ACTIVE, STANDBY, UNKNOWN - */ - public int getState() { - return state; - } - - /** - * Is this the active POD? - * @return true if we are active (the master), false otherwise - */ - public boolean isActive() { - return state == ACTIVE; - } - - /** - * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, - * and that we should re-synchronize with the master. - */ - public void doFetch() { - doFetch = true; - } - - /** - * Runs once a minute in order to
    - *
  1. lookup DNS names,
  2. - *
  3. determine the state of this POD,
  4. - *
  5. if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.
  6. - *
  7. if this is a standby POD, check if there are any new log records to be replicated.
  8. - *
- */ - @Override - public void run() { - try { - state = lookupState(); - if (state == STANDBY) { - // Only copy provisioning data FROM the active server TO the standby - if (doFetch || (System.currentTimeMillis() >= nextsynctime)) { - logger.debug("Initiating a sync..."); - JSONObject jo = readProvisioningJSON(); - if (jo != null) { - doFetch = false; - syncFeeds( jo.getJSONArray("feeds")); - syncSubs( jo.getJSONArray("subscriptions")); - syncGroups( jo.getJSONArray("groups")); //Rally:US708115 - 1610 - syncParams(jo.getJSONObject("parameters")); - // The following will not be present in a version=1.0 provfeed - JSONArray ja = jo.optJSONArray("ingress"); - if (ja != null) - syncIngressRoutes(ja); - JSONObject j2 = jo.optJSONObject("egress"); - if (j2 != null) - syncEgressRoutes( j2); - ja = jo.optJSONArray("routing"); - if (ja != null) - syncNetworkRoutes(ja); - } - logger.info("PROV5013: Sync completed."); - nextsynctime = System.currentTimeMillis() + ONE_HOUR; - } - } else { - // Don't do fetches on non-standby PODs - doFetch = false; - } - - // Fetch DR logs as needed - server to server - LogfileLoader lfl = LogfileLoader.getLoader(); - if (lfl.isIdle()) { - // Only fetch new logs if the loader is waiting for them. - logger.trace("Checking for logs to replicate..."); - RLEBitSet local = lfl.getBitSet(); - RLEBitSet remote = readRemoteLoglist(); - remote.andNot(local); - if (!remote.isEmpty()) { - logger.debug(" Replicating logs: "+remote); - replicateDRLogs(remote); - } - } - } catch (Exception e) { - logger.warn("PROV0020: Caught exception in SynchronizerTask: "+e); - e.printStackTrace(); - } - } - - /** - * This method is used to lookup the CNAME that points to the active server. - * It returns 0 (UNKNOWN), 1(ACTIVE), or 2 (STANDBY) to indicate the state of this server. - * @return the current state - */ - private int lookupState() { - int newstate = UNKNOWN; - try { - InetAddress myaddr = InetAddress.getLocalHost(); - if (logger.isTraceEnabled()) - logger.trace("My address: "+myaddr); - String this_pod = myaddr.getHostName(); - Set pods = new TreeSet(Arrays.asList(BaseServlet.getPods())); - if (pods.contains(this_pod)) { - InetAddress pserver = InetAddress.getByName(BaseServlet.active_prov_name); - newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY; - if (logger.isDebugEnabled() && System.currentTimeMillis() >= next_msg) { - logger.debug("Active POD = "+pserver+", Current state is "+stnames[newstate]); - next_msg = System.currentTimeMillis() + (5 * 60 * 1000L); - } - } else { - logger.warn("PROV5003: My name ("+this_pod+") is missing from the list of provisioning servers."); - } - } catch (UnknownHostException e) { - logger.warn("PROV5002: Cannot determine the name of this provisioning server."); - } - - if (newstate != state) - logger.info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate])); - return newstate; - } - private static long next_msg = 0; // only display the "Current state" msg every 5 mins. - /** Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */ - private void syncFeeds(JSONArray ja) { - Collection coll = new ArrayList(); - for (int n = 0; n < ja.length(); n++) { - try { - Feed f = new Feed(ja.getJSONObject(n)); - coll.add(f); - } catch (Exception e) { - logger.warn("PROV5004: Invalid object in feed: "+ja.optJSONObject(n)); - } - } - if (sync(coll, Feed.getAllFeeds())) - BaseServlet.provisioningDataChanged(); - } - /** Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. */ - private void syncSubs(JSONArray ja) { - Collection coll = new ArrayList(); - for (int n = 0; n < ja.length(); n++) { - try { - //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047. - JSONObject j = ja.getJSONObject(n); - j.put("sync", "true"); - Subscription s = new Subscription(j); - coll.add(s); - } catch (Exception e) { - logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n)); - } - } - if (sync(coll, Subscription.getAllSubscriptions())) - BaseServlet.provisioningDataChanged(); - } - - /** Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB. */ - private void syncGroups(JSONArray ja) { - Collection coll = new ArrayList(); - for (int n = 0; n < ja.length(); n++) { - try { - Group g = new Group(ja.getJSONObject(n)); - coll.add(g); - } catch (Exception e) { - logger.warn("PROV5004: Invalid object in subscription: "+ja.optJSONObject(n)); - } - } - if (sync(coll, Group.getAllgroups())) - BaseServlet.provisioningDataChanged(); - } - - - /** Synchronize the Parameters in the JSONObject, with the Parameters in the DB. */ - private void syncParams(JSONObject jo) { - Collection coll = new ArrayList(); - for (String k : jo.keySet()) { - String v = ""; - try { - v = jo.getString(k); - } catch (JSONException e) { - try { - v = ""+jo.getInt(k); - } catch (JSONException e1) { - JSONArray ja = jo.getJSONArray(k); - for (int i = 0; i < ja.length(); i++) { - if (i > 0) - v += "|"; - v += ja.getString(i); - } - } - } - coll.add(new Parameters(k, v)); - } - if (sync(coll, Parameters.getParameterCollection())) { - BaseServlet.provisioningDataChanged(); - BaseServlet.provisioningParametersChanged(); - } - } - private void syncIngressRoutes(JSONArray ja) { - Collection coll = new ArrayList(); - for (int n = 0; n < ja.length(); n++) { - try { - IngressRoute in = new IngressRoute(ja.getJSONObject(n)); - coll.add(in); - } catch (NumberFormatException e) { - logger.warn("PROV5004: Invalid object in ingress routes: "+ja.optJSONObject(n)); - } - } - if (sync(coll, IngressRoute.getAllIngressRoutes())) - BaseServlet.provisioningDataChanged(); - } - private void syncEgressRoutes(JSONObject jo) { - Collection coll = new ArrayList(); - for (String key : jo.keySet()) { - try { - int sub = Integer.parseInt(key); - String node = jo.getString(key); - EgressRoute er = new EgressRoute(sub, node); - coll.add(er); - } catch (NumberFormatException e) { - logger.warn("PROV5004: Invalid subid in egress routes: "+key); - } catch (IllegalArgumentException e) { - logger.warn("PROV5004: Invalid node name in egress routes: "+key); - } - } - if (sync(coll, EgressRoute.getAllEgressRoutes())) - BaseServlet.provisioningDataChanged(); - } - private void syncNetworkRoutes(JSONArray ja) { - Collection coll = new ArrayList(); - for (int n = 0; n < ja.length(); n++) { - try { - NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n)); - coll.add(nr); - } catch (JSONException e) { - logger.warn("PROV5004: Invalid object in network routes: "+ja.optJSONObject(n)); - } - } - if (sync(coll, NetworkRoute.getAllNetworkRoutes())) - BaseServlet.provisioningDataChanged(); - } - private boolean sync(Collection newc, Collection oldc) { - boolean changes = false; - try { - Map newmap = getMap(newc); - Map oldmap = getMap(oldc); - Set union = new TreeSet(newmap.keySet()); - union.addAll(oldmap.keySet()); - DB db = new DB(); - @SuppressWarnings("resource") - Connection conn = db.getConnection(); - for (String n : union) { - Syncable newobj = newmap.get(n); - Syncable oldobj = oldmap.get(n); - if (oldobj == null) { - if (logger.isDebugEnabled()) - logger.debug(" Inserting record: "+newobj); - newobj.doInsert(conn); - changes = true; - } else if (newobj == null) { - if (logger.isDebugEnabled()) - logger.debug(" Deleting record: "+oldobj); - oldobj.doDelete(conn); - changes = true; - } else if (!newobj.equals(oldobj)) { - if (logger.isDebugEnabled()) - logger.debug(" Updating record: "+newobj); - newobj.doUpdate(conn); - - /**Rally US708115 - * Change Ownership of FEED - 1610, Syncronised with secondary DB. - * */ - checkChnageOwner(newobj, oldobj); - - changes = true; - } - } - db.release(conn); - } catch (SQLException e) { - logger.warn("PROV5009: problem during sync, exception: "+e); - e.printStackTrace(); - } - return changes; - } - private Map getMap(Collection c) { - Map map = new HashMap(); - for (Syncable v : c) { - map.put(v.getKey(), v); - } - return map; - } - - - /**Change owner of FEED/SUBSCRIPTION*/ - /**Rally US708115 - * Change Ownership of FEED - 1610 - * - * */ - private void checkChnageOwner(Syncable newobj, Syncable oldobj) { - if(newobj instanceof Feed) { - Feed oldfeed = (Feed) oldobj; - Feed newfeed = (Feed) newobj; - - if(!oldfeed.getPublisher().equals(newfeed.getPublisher())){ - logger.info("PROV5013 - Previous publisher: "+oldfeed.getPublisher() +": New publisher-"+newfeed.getPublisher()); - oldfeed.setPublisher(newfeed.getPublisher()); - oldfeed.changeOwnerShip(); - } - } - else if(newobj instanceof Subscription) { - Subscription oldsub = (Subscription) oldobj; - Subscription newsub = (Subscription) newobj; - - if(!oldsub.getSubscriber().equals(newsub.getSubscriber())){ - logger.info("PROV5013 - Previous subscriber: "+oldsub.getSubscriber() +": New subscriber-"+newsub.getSubscriber()); - oldsub.setSubscriber(newsub.getSubscriber()); - oldsub.changeOwnerShip(); - } - } - - } - - /** - * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data. - * @return the provisioning data (as a JONObject) - */ - private synchronized JSONObject readProvisioningJSON() { - String url = URLUtilities.generatePeerProvURL(); - HttpGet get = new HttpGet(url); - try { - HttpResponse response = httpclient.execute(get); - int code = response.getStatusLine().getStatusCode(); - if (code != HttpServletResponse.SC_OK) { - logger.warn("PROV5010: readProvisioningJSON failed, bad error code: "+code); - return null; - } - HttpEntity entity = response.getEntity(); - String ctype = entity.getContentType().getValue().trim(); - if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) { - logger.warn("PROV5011: readProvisioningJSON failed, bad content type: "+ctype); - return null; - } - return new JSONObject(new JSONTokener(entity.getContent())); - } catch (Exception e) { - logger.warn("PROV5012: readProvisioningJSON failed, exception: "+e); - return null; - } finally { - get.releaseConnection(); - } - } - /** - * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the - * log records available in the remote database. - * @return the bitset - */ - private RLEBitSet readRemoteLoglist() { - RLEBitSet bs = new RLEBitSet(); - String url = URLUtilities.generatePeerLogsURL(); - - //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset. - if(url.equals("")) { - return bs; - } - //End of fix. - - HttpGet get = new HttpGet(url); - try { - HttpResponse response = httpclient.execute(get); - int code = response.getStatusLine().getStatusCode(); - if (code != HttpServletResponse.SC_OK) { - logger.warn("PROV5010: readRemoteLoglist failed, bad error code: "+code); - return bs; - } - HttpEntity entity = response.getEntity(); - String ctype = entity.getContentType().getValue().trim(); - if (!ctype.equals("text/plain")) { - logger.warn("PROV5011: readRemoteLoglist failed, bad content type: "+ctype); - return bs; - } - InputStream is = entity.getContent(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int ch = 0; - while ((ch = is.read()) >= 0) - bos.write(ch); - bs.set(bos.toString()); - is.close(); - } catch (Exception e) { - logger.warn("PROV5012: readRemoteLoglist failed, exception: "+e); - return bs; - } finally { - get.releaseConnection(); - } - return bs; - } - /** - * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available - * in the remote database that we wish to copy to the local database. - * @param bs the bitset (an RELBitSet) of log records to fetch - */ - private void replicateDRLogs(RLEBitSet bs) { - String url = URLUtilities.generatePeerLogsURL(); - HttpPost post = new HttpPost(url); - try { - String t = bs.toString(); - HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain")); - post.setEntity(body); - if (logger.isDebugEnabled()) - logger.debug("Requesting records: "+t); - - HttpResponse response = httpclient.execute(post); - int code = response.getStatusLine().getStatusCode(); - if (code != HttpServletResponse.SC_OK) { - logger.warn("PROV5010: replicateDRLogs failed, bad error code: "+code); - return; - } - HttpEntity entity = response.getEntity(); - String ctype = entity.getContentType().getValue().trim(); - if (!ctype.equals("text/plain")) { - logger.warn("PROV5011: replicateDRLogs failed, bad content type: "+ctype); - return; - } - - String spoolname = "" + System.currentTimeMillis(); - Path tmppath = Paths.get(spooldir, spoolname); - Path donepath = Paths.get(spooldir, "IN."+spoolname); - Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING); - Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING); - logger.info("Approximately "+bs.cardinality()+" records replicated."); - } catch (Exception e) { - logger.warn("PROV5012: replicateDRLogs failed, exception: "+e); - } finally { - post.releaseConnection(); - } - } -} +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.provisioning; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.security.KeyStore; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeSet; + +import javax.servlet.http.HttpServletResponse; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.AbstractHttpClient; +import org.apache.http.impl.client.DefaultHttpClient; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute; +import org.onap.dmaap.datarouter.provisioning.beans.Feed; +import org.onap.dmaap.datarouter.provisioning.beans.Group; +import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute; +import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute; +import org.onap.dmaap.datarouter.provisioning.beans.Parameters; +import org.onap.dmaap.datarouter.provisioning.beans.Subscription; +import org.onap.dmaap.datarouter.provisioning.beans.Syncable; +import org.onap.dmaap.datarouter.provisioning.utils.DB; +import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader; +import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet; +import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities; + +/** + * This class handles synchronization between provisioning servers (PODs). It has three primary functions: + *
    + *
  1. Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to + * the active (master) POD.
  2. + *
  3. On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.
  4. + *
  5. Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN) + * of this POD.
  6. + *
+ *

For this to work correctly, the following code needs to be placed at the beginning of main().

+ * + * Security.setProperty("networkaddress.cache.ttl", "10"); + * + * + * @author Robert Eby + * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $ + */ +public class SynchronizerTask extends TimerTask { + + /** + * This is a singleton -- there is only one SynchronizerTask object in the server + */ + private static SynchronizerTask synctask; + + /** + * This POD is unknown -- not on the list of PODs + */ + public static final int UNKNOWN = 0; + /** + * This POD is active -- on the list of PODs, and the DNS CNAME points to us + */ + public static final int ACTIVE = 1; + /** + * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us + */ + public static final int STANDBY = 2; + private static final String[] stnames = {"UNKNOWN", "ACTIVE", "STANDBY"}; + private static final long ONE_HOUR = 60 * 60 * 1000L; + + private final EELFLogger logger; + private final Timer rolex; + private final String spooldir; + private int state; + private boolean doFetch; + private long nextsynctime; + private AbstractHttpClient httpclient = null; + + /** + * Get the singleton SynchronizerTask object. + * + * @return the SynchronizerTask + */ + public static synchronized SynchronizerTask getSynchronizer() { + if (synctask == null) { + synctask = new SynchronizerTask(); + } + return synctask; + } + + @SuppressWarnings("deprecation") + private SynchronizerTask() { + logger = EELFManager.getInstance().getLogger("InternalLog"); + rolex = new Timer(); + spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir"); + state = UNKNOWN; + doFetch = true; // start off with a fetch + nextsynctime = 0; + + logger.info("PROV5000: Sync task starting, server state is UNKNOWN"); + try { + Properties props = (new DB()).getProperties(); + String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks"); + String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY); + String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY); + KeyStore keyStore = KeyStore.getInstance(type); + try(FileInputStream instream = new FileInputStream(new File(store))) { + keyStore.load(instream, pass.toCharArray()); + + } + store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY); + pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY); + KeyStore trustStore = null; + if (store != null && store.length() > 0) { + trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try(FileInputStream instream = new FileInputStream(new File(store))){ + trustStore.load(instream, pass.toCharArray()); + + } + } + + // We are connecting with the node name, but the certificate will have the CNAME + // So we need to accept a non-matching certificate name + String keystorepass = props.getProperty( + Main.KEYSTORE_PASS_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref + try(AbstractHttpClient hc = new DefaultHttpClient()) { + SSLSocketFactory socketFactory = + (trustStore == null) + ? new SSLSocketFactory(keyStore, keystorepass) + : new SSLSocketFactory(keyStore, keystorepass, trustStore); + socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + Scheme sch = new Scheme("https", 443, socketFactory); + hc.getConnectionManager().getSchemeRegistry().register(sch); + httpclient = hc; + } + // Run once every 5 seconds to check DNS, etc. + long interval = 0; + try { + String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000"); + interval = Long.parseLong(s); + } catch (NumberFormatException e) { + interval = 5000L; + } + rolex.scheduleAtFixedRate(this, 0L, interval); + } catch (Exception e) { + logger.warn("PROV5005: Problem starting the synchronizer: " + e); + } + } + + /** + * What is the state of this POD? + * + * @return one of ACTIVE, STANDBY, UNKNOWN + */ + public int getState() { + return state; + } + + /** + * Is this the active POD? + * + * @return true if we are active (the master), false otherwise + */ + public boolean isActive() { + return state == ACTIVE; + } + + /** + * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we + * should re-synchronize with the master. + */ + public void doFetch() { + doFetch = true; + } + + /** + * Runs once a minute in order to
    + *
  1. lookup DNS names,
  2. + *
  3. determine the state of this POD,
  4. + *
  5. if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.
  6. + *
  7. if this is a standby POD, check if there are any new log records to be replicated.
  8. + *
+ */ + @Override + public void run() { + try { + state = lookupState(); + if (state == STANDBY) { + // Only copy provisioning data FROM the active server TO the standby + if (doFetch || (System.currentTimeMillis() >= nextsynctime)) { + logger.debug("Initiating a sync..."); + JSONObject jo = readProvisioningJSON(); + if (jo != null) { + doFetch = false; + syncFeeds(jo.getJSONArray("feeds")); + syncSubs(jo.getJSONArray("subscriptions")); + syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610 + syncParams(jo.getJSONObject("parameters")); + // The following will not be present in a version=1.0 provfeed + JSONArray ja = jo.optJSONArray("ingress"); + if (ja != null) { + syncIngressRoutes(ja); + } + JSONObject j2 = jo.optJSONObject("egress"); + if (j2 != null) { + syncEgressRoutes(j2); + } + ja = jo.optJSONArray("routing"); + if (ja != null) { + syncNetworkRoutes(ja); + } + } + logger.info("PROV5013: Sync completed."); + nextsynctime = System.currentTimeMillis() + ONE_HOUR; + } + } else { + // Don't do fetches on non-standby PODs + doFetch = false; + } + + // Fetch DR logs as needed - server to server + LogfileLoader lfl = LogfileLoader.getLoader(); + if (lfl.isIdle()) { + // Only fetch new logs if the loader is waiting for them. + logger.trace("Checking for logs to replicate..."); + RLEBitSet local = lfl.getBitSet(); + RLEBitSet remote = readRemoteLoglist(); + remote.andNot(local); + if (!remote.isEmpty()) { + logger.debug(" Replicating logs: " + remote); + replicateDRLogs(remote); + } + } + } catch (Exception e) { + logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e); + } + } + + /** + * This method is used to lookup the CNAME that points to the active server. It returns 0 (UNKNOWN), 1(ACTIVE), or 2 + * (STANDBY) to indicate the state of this server. + * + * @return the current state + */ + private int lookupState() { + int newstate = UNKNOWN; + try { + InetAddress myaddr = InetAddress.getLocalHost(); + if (logger.isTraceEnabled()) { + logger.trace("My address: " + myaddr); + } + String thisPod = myaddr.getHostName(); + Set pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods())); + if (pods.contains(thisPod)) { + InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName()); + newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY; + if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) { + logger.debug("Active POD = " + pserver + ", Current state is " + stnames[newstate]); + nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L); + } + } else { + logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers."); + } + } catch (UnknownHostException e) { + logger.warn("PROV5002: Cannot determine the name of this provisioning server."); + } + + if (newstate != state) { + logger + .info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate])); + } + return newstate; + } + + private static long nextMsg = 0; // only display the "Current state" msg every 5 mins. + + /** + * Synchronize the Feeds in the JSONArray, with the Feeds in the DB. + */ + private void syncFeeds(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + Feed f = new Feed(ja.getJSONObject(n)); + coll.add(f); + } catch (Exception e) { + logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n)); + } + } + if (sync(coll, Feed.getAllFeeds())) { + BaseServlet.provisioningDataChanged(); + } + } + + /** + * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB. + */ + private void syncSubs(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047. + JSONObject j = ja.getJSONObject(n); + j.put("sync", "true"); + Subscription s = new Subscription(j); + coll.add(s); + } catch (Exception e) { + logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n)); + } + } + if (sync(coll, Subscription.getAllSubscriptions())) { + BaseServlet.provisioningDataChanged(); + } + } + + /** + * Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB. + */ + private void syncGroups(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + Group g = new Group(ja.getJSONObject(n)); + coll.add(g); + } catch (Exception e) { + logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n)); + } + } + if (sync(coll, Group.getAllgroups())) { + BaseServlet.provisioningDataChanged(); + } + } + + + /** + * Synchronize the Parameters in the JSONObject, with the Parameters in the DB. + */ + private void syncParams(JSONObject jo) { + Collection coll = new ArrayList<>(); + for (String k : jo.keySet()) { + String v = ""; + try { + v = jo.getString(k); + } catch (JSONException e) { + try { + v = "" + jo.getInt(k); + } catch (JSONException e1) { + JSONArray ja = jo.getJSONArray(k); + for (int i = 0; i < ja.length(); i++) { + if (i > 0) { + v += "|"; + } + v += ja.getString(i); + } + } + } + coll.add(new Parameters(k, v)); + } + if (sync(coll, Parameters.getParameterCollection())) { + BaseServlet.provisioningDataChanged(); + BaseServlet.provisioningParametersChanged(); + } + } + + private void syncIngressRoutes(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + IngressRoute in = new IngressRoute(ja.getJSONObject(n)); + coll.add(in); + } catch (NumberFormatException e) { + logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n)); + } + } + if (sync(coll, IngressRoute.getAllIngressRoutes())) { + BaseServlet.provisioningDataChanged(); + } + } + + private void syncEgressRoutes(JSONObject jo) { + Collection coll = new ArrayList<>(); + for (String key : jo.keySet()) { + try { + int sub = Integer.parseInt(key); + String node = jo.getString(key); + EgressRoute er = new EgressRoute(sub, node); + coll.add(er); + } catch (NumberFormatException e) { + logger.warn("PROV5004: Invalid subid in egress routes: " + key); + } catch (IllegalArgumentException e) { + logger.warn("PROV5004: Invalid node name in egress routes: " + key); + } + } + if (sync(coll, EgressRoute.getAllEgressRoutes())) { + BaseServlet.provisioningDataChanged(); + } + } + + private void syncNetworkRoutes(JSONArray ja) { + Collection coll = new ArrayList<>(); + for (int n = 0; n < ja.length(); n++) { + try { + NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n)); + coll.add(nr); + } catch (JSONException e) { + logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n)); + } + } + if (sync(coll, NetworkRoute.getAllNetworkRoutes())) { + BaseServlet.provisioningDataChanged(); + } + } + + private boolean sync(Collection newc, Collection oldc) { + boolean changes = false; + try { + Map newmap = getMap(newc); + Map oldmap = getMap(oldc); + Set union = new TreeSet<>(newmap.keySet()); + union.addAll(oldmap.keySet()); + DB db = new DB(); + @SuppressWarnings("resource") + Connection conn = db.getConnection(); + for (String n : union) { + Syncable newobj = newmap.get(n); + Syncable oldobj = oldmap.get(n); + if (oldobj == null) { + if (logger.isDebugEnabled()) { + logger.debug(" Inserting record: " + newobj); + } + newobj.doInsert(conn); + changes = true; + } else if (newobj == null) { + if (logger.isDebugEnabled()) { + logger.debug(" Deleting record: " + oldobj); + } + oldobj.doDelete(conn); + changes = true; + } else if (!newobj.equals(oldobj)) { + if (logger.isDebugEnabled()) { + logger.debug(" Updating record: " + newobj); + } + newobj.doUpdate(conn); + + /**Rally US708115 + * Change Ownership of FEED - 1610, Syncronised with secondary DB. + * */ + checkChnageOwner(newobj, oldobj); + + changes = true; + } + } + db.release(conn); + } catch (SQLException e) { + logger.warn("PROV5009: problem during sync, exception: " + e); + } + return changes; + } + + private Map getMap(Collection c) { + Map map = new HashMap<>(); + for (Syncable v : c) { + map.put(v.getKey(), v); + } + return map; + } + + /**Change owner of FEED/SUBSCRIPTION*/ + /** + * Rally US708115 Change Ownership of FEED - 1610 + */ + private void checkChnageOwner(Syncable newobj, Syncable oldobj) { + if (newobj instanceof Feed) { + Feed oldfeed = (Feed) oldobj; + Feed newfeed = (Feed) newobj; + + if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) { + logger.info("PROV5013 - Previous publisher: " + oldfeed.getPublisher() + ": New publisher-" + newfeed + .getPublisher()); + oldfeed.setPublisher(newfeed.getPublisher()); + oldfeed.changeOwnerShip(); + } + } else if (newobj instanceof Subscription) { + Subscription oldsub = (Subscription) oldobj; + Subscription newsub = (Subscription) newobj; + + if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) { + logger.info("PROV5013 - Previous subscriber: " + oldsub.getSubscriber() + ": New subscriber-" + newsub + .getSubscriber()); + oldsub.setSubscriber(newsub.getSubscriber()); + oldsub.changeOwnerShip(); + } + } + + } + + /** + * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data. + * + * @return the provisioning data (as a JONObject) + */ + private synchronized JSONObject readProvisioningJSON() { + String url = URLUtilities.generatePeerProvURL(); + HttpGet get = new HttpGet(url); + try { + HttpResponse response = httpclient.execute(get); + int code = response.getStatusLine().getStatusCode(); + if (code != HttpServletResponse.SC_OK) { + logger.warn("PROV5010: readProvisioningJSON failed, bad error code: " + code); + return null; + } + HttpEntity entity = response.getEntity(); + String ctype = entity.getContentType().getValue().trim(); + if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype + .equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) { + logger.warn("PROV5011: readProvisioningJSON failed, bad content type: " + ctype); + return null; + } + return new JSONObject(new JSONTokener(entity.getContent())); + } catch (Exception e) { + logger.warn("PROV5012: readProvisioningJSON failed, exception: " + e); + return null; + } finally { + get.releaseConnection(); + } + } + + /** + * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in + * the remote database. + * + * @return the bitset + */ + private RLEBitSet readRemoteLoglist() { + RLEBitSet bs = new RLEBitSet(); + String url = URLUtilities.generatePeerLogsURL(); + + //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset. + if (url.equals("")) { + return bs; + } + //End of fix. + + HttpGet get = new HttpGet(url); + try { + HttpResponse response = httpclient.execute(get); + int code = response.getStatusLine().getStatusCode(); + if (code != HttpServletResponse.SC_OK) { + logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code); + return bs; + } + HttpEntity entity = response.getEntity(); + String ctype = entity.getContentType().getValue().trim(); + if (!ctype.equals("text/plain")) { + logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype); + return bs; + } + InputStream is = entity.getContent(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int ch = 0; + while ((ch = is.read()) >= 0) { + bos.write(ch); + } + bs.set(bos.toString()); + is.close(); + } catch (Exception e) { + logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e); + return bs; + } finally { + get.releaseConnection(); + } + return bs; + } + + /** + * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that + * we wish to copy to the local database. + * + * @param bs the bitset (an RELBitSet) of log records to fetch + */ + private void replicateDRLogs(RLEBitSet bs) { + String url = URLUtilities.generatePeerLogsURL(); + HttpPost post = new HttpPost(url); + try { + String t = bs.toString(); + HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain")); + post.setEntity(body); + if (logger.isDebugEnabled()) { + logger.debug("Requesting records: " + t); + } + + HttpResponse response = httpclient.execute(post); + int code = response.getStatusLine().getStatusCode(); + if (code != HttpServletResponse.SC_OK) { + logger.warn("PROV5010: replicateDRLogs failed, bad error code: " + code); + return; + } + HttpEntity entity = response.getEntity(); + String ctype = entity.getContentType().getValue().trim(); + if (!ctype.equals("text/plain")) { + logger.warn("PROV5011: replicateDRLogs failed, bad content type: " + ctype); + return; + } + + String spoolname = "" + System.currentTimeMillis(); + Path tmppath = Paths.get(spooldir, spoolname); + Path donepath = Paths.get(spooldir, "IN." + spoolname); + Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING); + Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING); + logger.info("Approximately " + bs.cardinality() + " records replicated."); + } catch (Exception e) { + logger.warn("PROV5012: replicateDRLogs failed, exception: " + e); + } finally { + post.releaseConnection(); + } + } +}