1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.provisioning;
27 import java.io.ByteArrayOutputStream;
29 import java.io.FileInputStream;
30 import java.io.InputStream;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.nio.file.Files;
34 import java.nio.file.Path;
35 import java.nio.file.Paths;
36 import java.nio.file.StandardCopyOption;
37 import java.security.KeyStore;
38 import java.sql.Connection;
39 import java.sql.SQLException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.HashMap;
45 import java.util.Properties;
47 import java.util.Timer;
48 import java.util.TimerTask;
49 import java.util.TreeSet;
51 import javax.servlet.http.HttpServletResponse;
53 import com.att.eelf.configuration.EELFLogger;
54 import com.att.eelf.configuration.EELFManager;
55 import org.apache.http.HttpEntity;
56 import org.apache.http.HttpResponse;
57 import org.apache.http.client.methods.HttpGet;
58 import org.apache.http.client.methods.HttpPost;
59 import org.apache.http.conn.scheme.Scheme;
60 import org.apache.http.conn.ssl.SSLSocketFactory;
61 import org.apache.http.entity.ByteArrayEntity;
62 import org.apache.http.entity.ContentType;
63 import org.apache.http.impl.client.AbstractHttpClient;
64 import org.apache.http.impl.client.DefaultHttpClient;
65 import org.json.JSONArray;
66 import org.json.JSONException;
67 import org.json.JSONObject;
68 import org.json.JSONTokener;
69 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
70 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
71 import org.onap.dmaap.datarouter.provisioning.beans.Group;
72 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
73 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
74 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
75 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
76 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
77 import org.onap.dmaap.datarouter.provisioning.utils.DB;
78 import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
79 import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
80 import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
83 * This class handles synchronization between provisioning servers (PODs). It has three primary functions:
85 * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
86 * the active (master) POD.</li>
87 * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
88 * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
91 * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
93 * Security.setProperty("networkaddress.cache.ttl", "10");
97 * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
99 public class SynchronizerTask extends TimerTask {
102 * This is a singleton -- there is only one SynchronizerTask object in the server
104 private static SynchronizerTask synctask;
107 * This POD is unknown -- not on the list of PODs
109 public static final int UNKNOWN = 0;
111 * This POD is active -- on the list of PODs, and the DNS CNAME points to us
113 public static final int ACTIVE = 1;
115 * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us
117 public static final int STANDBY = 2;
118 private static final String[] stnames = {"UNKNOWN", "ACTIVE", "STANDBY"};
119 private static final long ONE_HOUR = 60 * 60 * 1000L;
121 private final EELFLogger logger;
122 private final Timer rolex;
123 private final String spooldir;
125 private boolean doFetch;
126 private long nextsynctime;
127 private AbstractHttpClient httpclient = null;
130 * Get the singleton SynchronizerTask object.
132 * @return the SynchronizerTask
134 public static synchronized SynchronizerTask getSynchronizer() {
135 if (synctask == null) {
136 synctask = new SynchronizerTask();
141 @SuppressWarnings("deprecation")
142 private SynchronizerTask() {
143 logger = EELFManager.getInstance().getLogger("InternalLog");
145 spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
147 doFetch = true; // start off with a fetch
150 logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
152 Properties props = (new DB()).getProperties();
153 String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
154 String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
155 String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
156 KeyStore keyStore = KeyStore.getInstance(type);
157 try(FileInputStream instream = new FileInputStream(new File(store))) {
158 keyStore.load(instream, pass.toCharArray());
161 store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
162 pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
163 KeyStore trustStore = null;
164 if (store != null && store.length() > 0) {
165 trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
166 try(FileInputStream instream = new FileInputStream(new File(store))){
167 trustStore.load(instream, pass.toCharArray());
172 // We are connecting with the node name, but the certificate will have the CNAME
173 // So we need to accept a non-matching certificate name
174 String keystorepass = props.getProperty(
175 Main.KEYSTORE_PASS_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
176 try(AbstractHttpClient hc = new DefaultHttpClient()) {
177 SSLSocketFactory socketFactory =
179 ? new SSLSocketFactory(keyStore, keystorepass)
180 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
181 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
182 Scheme sch = new Scheme("https", 443, socketFactory);
183 hc.getConnectionManager().getSchemeRegistry().register(sch);
186 // Run once every 5 seconds to check DNS, etc.
189 String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
190 interval = Long.parseLong(s);
191 } catch (NumberFormatException e) {
194 rolex.scheduleAtFixedRate(this, 0L, interval);
195 } catch (Exception e) {
196 logger.warn("PROV5005: Problem starting the synchronizer: " + e);
201 * What is the state of this POD?
203 * @return one of ACTIVE, STANDBY, UNKNOWN
205 public int getState() {
210 * Is this the active POD?
212 * @return true if we are active (the master), false otherwise
214 public boolean isActive() {
215 return state == ACTIVE;
219 * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
220 * should re-synchronize with the master.
222 public void doFetch() {
227 * Runs once a minute in order to <ol>
228 * <li>lookup DNS names,</li>
229 * <li>determine the state of this POD,</li>
230 * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
231 * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
237 state = lookupState();
238 if (state == STANDBY) {
239 // Only copy provisioning data FROM the active server TO the standby
240 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
241 logger.debug("Initiating a sync...");
242 JSONObject jo = readProvisioningJSON();
245 syncFeeds(jo.getJSONArray("feeds"));
246 syncSubs(jo.getJSONArray("subscriptions"));
247 syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
248 syncParams(jo.getJSONObject("parameters"));
249 // The following will not be present in a version=1.0 provfeed
250 JSONArray ja = jo.optJSONArray("ingress");
252 syncIngressRoutes(ja);
254 JSONObject j2 = jo.optJSONObject("egress");
256 syncEgressRoutes(j2);
258 ja = jo.optJSONArray("routing");
260 syncNetworkRoutes(ja);
263 logger.info("PROV5013: Sync completed.");
264 nextsynctime = System.currentTimeMillis() + ONE_HOUR;
267 // Don't do fetches on non-standby PODs
271 // Fetch DR logs as needed - server to server
272 LogfileLoader lfl = LogfileLoader.getLoader();
274 // Only fetch new logs if the loader is waiting for them.
275 logger.trace("Checking for logs to replicate...");
276 RLEBitSet local = lfl.getBitSet();
277 RLEBitSet remote = readRemoteLoglist();
278 remote.andNot(local);
279 if (!remote.isEmpty()) {
280 logger.debug(" Replicating logs: " + remote);
281 replicateDRLogs(remote);
284 } catch (Exception e) {
285 logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
290 * This method is used to lookup the CNAME that points to the active server. It returns 0 (UNKNOWN), 1(ACTIVE), or 2
291 * (STANDBY) to indicate the state of this server.
293 * @return the current state
295 private int lookupState() {
296 int newstate = UNKNOWN;
298 InetAddress myaddr = InetAddress.getLocalHost();
299 if (logger.isTraceEnabled()) {
300 logger.trace("My address: " + myaddr);
302 String thisPod = myaddr.getHostName();
303 Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
304 if (pods.contains(thisPod)) {
305 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
306 newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
307 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
308 logger.debug("Active POD = " + pserver + ", Current state is " + stnames[newstate]);
309 nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
312 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
314 } catch (UnknownHostException e) {
315 logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
318 if (newstate != state) {
320 .info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
325 private static long nextMsg = 0; // only display the "Current state" msg every 5 mins.
328 * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
330 private void syncFeeds(JSONArray ja) {
331 Collection<Syncable> coll = new ArrayList<>();
332 for (int n = 0; n < ja.length(); n++) {
334 Feed f = new Feed(ja.getJSONObject(n));
336 } catch (Exception e) {
337 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
340 if (sync(coll, Feed.getAllFeeds())) {
341 BaseServlet.provisioningDataChanged();
346 * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
348 private void syncSubs(JSONArray ja) {
349 Collection<Syncable> coll = new ArrayList<>();
350 for (int n = 0; n < ja.length(); n++) {
352 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
353 JSONObject j = ja.getJSONObject(n);
354 j.put("sync", "true");
355 Subscription s = new Subscription(j);
357 } catch (Exception e) {
358 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
361 if (sync(coll, Subscription.getAllSubscriptions())) {
362 BaseServlet.provisioningDataChanged();
367 * Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB.
369 private void syncGroups(JSONArray ja) {
370 Collection<Syncable> coll = new ArrayList<>();
371 for (int n = 0; n < ja.length(); n++) {
373 Group g = new Group(ja.getJSONObject(n));
375 } catch (Exception e) {
376 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
379 if (sync(coll, Group.getAllgroups())) {
380 BaseServlet.provisioningDataChanged();
386 * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
388 private void syncParams(JSONObject jo) {
389 Collection<Syncable> coll = new ArrayList<>();
390 for (String k : jo.keySet()) {
394 } catch (JSONException e) {
395 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
397 v = "" + jo.getInt(k);
398 } catch (JSONException e1) {
399 logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e);
400 JSONArray ja = jo.getJSONArray(k);
401 for (int i = 0; i < ja.length(); i++) {
405 v += ja.getString(i);
409 coll.add(new Parameters(k, v));
411 if (sync(coll, Parameters.getParameterCollection())) {
412 BaseServlet.provisioningDataChanged();
413 BaseServlet.provisioningParametersChanged();
417 private void syncIngressRoutes(JSONArray ja) {
418 Collection<Syncable> coll = new ArrayList<>();
419 for (int n = 0; n < ja.length(); n++) {
421 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
423 } catch (NumberFormatException e) {
424 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
427 if (sync(coll, IngressRoute.getAllIngressRoutes())) {
428 BaseServlet.provisioningDataChanged();
432 private void syncEgressRoutes(JSONObject jo) {
433 Collection<Syncable> coll = new ArrayList<>();
434 for (String key : jo.keySet()) {
436 int sub = Integer.parseInt(key);
437 String node = jo.getString(key);
438 EgressRoute er = new EgressRoute(sub, node);
440 } catch (NumberFormatException e) {
441 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
442 } catch (IllegalArgumentException e) {
443 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
446 if (sync(coll, EgressRoute.getAllEgressRoutes())) {
447 BaseServlet.provisioningDataChanged();
451 private void syncNetworkRoutes(JSONArray ja) {
452 Collection<Syncable> coll = new ArrayList<>();
453 for (int n = 0; n < ja.length(); n++) {
455 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
457 } catch (JSONException e) {
458 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n));
461 if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
462 BaseServlet.provisioningDataChanged();
466 private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
467 boolean changes = false;
469 Map<String, Syncable> newmap = getMap(newc);
470 Map<String, Syncable> oldmap = getMap(oldc);
471 Set<String> union = new TreeSet<>(newmap.keySet());
472 union.addAll(oldmap.keySet());
474 @SuppressWarnings("resource")
475 Connection conn = db.getConnection();
476 for (String n : union) {
477 Syncable newobj = newmap.get(n);
478 Syncable oldobj = oldmap.get(n);
479 if (oldobj == null) {
480 if (logger.isDebugEnabled()) {
481 logger.debug(" Inserting record: " + newobj);
483 newobj.doInsert(conn);
485 } else if (newobj == null) {
486 if (logger.isDebugEnabled()) {
487 logger.debug(" Deleting record: " + oldobj);
489 oldobj.doDelete(conn);
491 } else if (!newobj.equals(oldobj)) {
492 if (logger.isDebugEnabled()) {
493 logger.debug(" Updating record: " + newobj);
495 newobj.doUpdate(conn);
498 * Change Ownership of FEED - 1610, Syncronised with secondary DB.
500 checkChnageOwner(newobj, oldobj);
506 } catch (SQLException e) {
507 logger.warn("PROV5009: problem during sync, exception: " + e);
512 private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
513 Map<String, Syncable> map = new HashMap<>();
514 for (Syncable v : c) {
515 map.put(v.getKey(), v);
520 /**Change owner of FEED/SUBSCRIPTION*/
522 * Rally US708115 Change Ownership of FEED - 1610
524 private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
525 if (newobj instanceof Feed) {
526 Feed oldfeed = (Feed) oldobj;
527 Feed newfeed = (Feed) newobj;
529 if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
530 logger.info("PROV5013 - Previous publisher: " + oldfeed.getPublisher() + ": New publisher-" + newfeed
532 oldfeed.setPublisher(newfeed.getPublisher());
533 oldfeed.changeOwnerShip();
535 } else if (newobj instanceof Subscription) {
536 Subscription oldsub = (Subscription) oldobj;
537 Subscription newsub = (Subscription) newobj;
539 if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
540 logger.info("PROV5013 - Previous subscriber: " + oldsub.getSubscriber() + ": New subscriber-" + newsub
542 oldsub.setSubscriber(newsub.getSubscriber());
543 oldsub.changeOwnerShip();
550 * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
552 * @return the provisioning data (as a JONObject)
554 private synchronized JSONObject readProvisioningJSON() {
555 String url = URLUtilities.generatePeerProvURL();
556 HttpGet get = new HttpGet(url);
558 HttpResponse response = httpclient.execute(get);
559 int code = response.getStatusLine().getStatusCode();
560 if (code != HttpServletResponse.SC_OK) {
561 logger.warn("PROV5010: readProvisioningJSON failed, bad error code: " + code);
564 HttpEntity entity = response.getEntity();
565 String ctype = entity.getContentType().getValue().trim();
566 if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype
567 .equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
568 logger.warn("PROV5011: readProvisioningJSON failed, bad content type: " + ctype);
571 return new JSONObject(new JSONTokener(entity.getContent()));
572 } catch (Exception e) {
573 logger.warn("PROV5012: readProvisioningJSON failed, exception: " + e);
576 get.releaseConnection();
581 * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
582 * the remote database.
586 private RLEBitSet readRemoteLoglist() {
587 RLEBitSet bs = new RLEBitSet();
588 String url = URLUtilities.generatePeerLogsURL();
590 //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
591 if ("".equals(url)) {
596 HttpGet get = new HttpGet(url);
598 HttpResponse response = httpclient.execute(get);
599 int code = response.getStatusLine().getStatusCode();
600 if (code != HttpServletResponse.SC_OK) {
601 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
604 HttpEntity entity = response.getEntity();
605 String ctype = entity.getContentType().getValue().trim();
606 if (!"text/plain".equals(ctype)) {
607 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
610 InputStream is = entity.getContent();
611 ByteArrayOutputStream bos = new ByteArrayOutputStream();
613 while ((ch = is.read()) >= 0) {
616 bs.set(bos.toString());
618 } catch (Exception e) {
619 logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
622 get.releaseConnection();
628 * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
629 * we wish to copy to the local database.
631 * @param bs the bitset (an RELBitSet) of log records to fetch
633 private void replicateDRLogs(RLEBitSet bs) {
634 String url = URLUtilities.generatePeerLogsURL();
635 HttpPost post = new HttpPost(url);
637 String t = bs.toString();
638 HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
639 post.setEntity(body);
640 if (logger.isDebugEnabled()) {
641 logger.debug("Requesting records: " + t);
644 HttpResponse response = httpclient.execute(post);
645 int code = response.getStatusLine().getStatusCode();
646 if (code != HttpServletResponse.SC_OK) {
647 logger.warn("PROV5010: replicateDRLogs failed, bad error code: " + code);
650 HttpEntity entity = response.getEntity();
651 String ctype = entity.getContentType().getValue().trim();
652 if (!"text/plain".equals(ctype)) {
653 logger.warn("PROV5011: replicateDRLogs failed, bad content type: " + ctype);
657 String spoolname = "" + System.currentTimeMillis();
658 Path tmppath = Paths.get(spooldir, spoolname);
659 Path donepath = Paths.get(spooldir, "IN." + spoolname);
660 Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
661 Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
662 logger.info("Approximately " + bs.cardinality() + " records replicated.");
663 } catch (Exception e) {
664 logger.warn("PROV5012: replicateDRLogs failed, exception: " + e);
666 post.releaseConnection();