1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.provisioning.utils;
27 import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31 import java.io.ByteArrayOutputStream;
32 import java.io.FileInputStream;
33 import java.io.InputStream;
34 import java.net.InetAddress;
35 import java.net.UnknownHostException;
36 import java.nio.file.Files;
37 import java.nio.file.Path;
38 import java.nio.file.Paths;
39 import java.nio.file.StandardCopyOption;
40 import java.security.KeyStore;
41 import java.sql.Connection;
42 import java.sql.SQLException;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collection;
46 import java.util.HashMap;
49 import java.util.Timer;
50 import java.util.TimerTask;
51 import java.util.TreeSet;
52 import jakarta.servlet.http.HttpServletResponse;
53 import org.apache.http.HttpEntity;
54 import org.apache.http.HttpResponse;
55 import org.apache.http.client.methods.HttpGet;
56 import org.apache.http.client.methods.HttpPost;
57 import org.apache.http.conn.scheme.PlainSocketFactory;
58 import org.apache.http.conn.scheme.Scheme;
59 import org.apache.http.conn.ssl.SSLSocketFactory;
60 import org.apache.http.entity.ByteArrayEntity;
61 import org.apache.http.entity.ContentType;
62 import org.apache.http.impl.client.AbstractHttpClient;
63 import org.apache.http.impl.client.DefaultHttpClient;
64 import org.json.JSONArray;
65 import org.json.JSONException;
66 import org.json.JSONObject;
67 import org.json.JSONTokener;
68 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
69 import org.onap.dmaap.datarouter.provisioning.ProvRunner;
70 import org.onap.dmaap.datarouter.provisioning.beans.EgressRoute;
71 import org.onap.dmaap.datarouter.provisioning.beans.Feed;
72 import org.onap.dmaap.datarouter.provisioning.beans.Group;
73 import org.onap.dmaap.datarouter.provisioning.beans.IngressRoute;
74 import org.onap.dmaap.datarouter.provisioning.beans.NetworkRoute;
75 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
76 import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
77 import org.onap.dmaap.datarouter.provisioning.beans.Syncable;
80 * This class handles synchronization between provisioning servers (PODs). It has three primary functions:
82 * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
83 * the active (master) POD.</li>
84 * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
85 * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
88 * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
90 * Security.setProperty("networkaddress.cache.ttl", "10");
94 * @version $Id: SynchronizerTask.java,v 1.10 2014/03/21 13:50:10 eby Exp $
97 public class SynchronizerTask extends TimerTask {
100 * This is a singleton -- there is only one SynchronizerTask object in the server.
102 private static SynchronizerTask synctask;
105 * This POD is unknown -- not on the list of PODs.
107 public static final int UNKNOWN_POD = 0;
109 * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
111 public static final int ACTIVE_POD = 1;
113 * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
115 public static final int STANDBY_POD = 2;
117 private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
118 private static final long ONE_HOUR = 60 * 60 * 1000L;
120 private long nextMsg = 0; // only display the "Current podState" msg every 5 mins.
122 private final EELFLogger logger;
123 private final Timer rolex;
124 private final String spooldir;
125 private int podState;
126 private boolean doFetch;
127 private long nextsynctime;
128 private AbstractHttpClient httpclient = null;
130 @SuppressWarnings("deprecation")
131 private SynchronizerTask() {
132 logger = EELFManager.getInstance().getLogger("InternalLog");
134 spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
135 podState = UNKNOWN_POD;
136 doFetch = true; // start off with a fetch
139 logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
140 try (AbstractHttpClient hc = new DefaultHttpClient()) {
142 if (Boolean.TRUE.equals(ProvRunner.getTlsEnabled())) {
144 String type = AafPropsUtils.KEYSTORE_TYPE_PROPERTY;
145 String store = ProvRunner.getAafPropsUtils().getKeystorePathProperty();
146 String pass = ProvRunner.getAafPropsUtils().getKeystorePassProperty();
147 KeyStore keyStore = KeyStore.getInstance(type);
148 try (FileInputStream instream = new FileInputStream(store)) {
149 keyStore.load(instream, pass.toCharArray());
153 store = ProvRunner.getAafPropsUtils().getTruststorePathProperty();
154 pass = ProvRunner.getAafPropsUtils().getTruststorePassProperty();
155 KeyStore trustStore = null;
156 if (store != null && store.length() > 0) {
157 trustStore = KeyStore.getInstance(AafPropsUtils.TRUESTSTORE_TYPE_PROPERTY);
158 try (FileInputStream instream = new FileInputStream(store)) {
159 trustStore.load(instream, pass.toCharArray());
163 // We are connecting with the node name, but the certificate will have the CNAME
164 // So we need to accept a non-matching certificate name
165 String keystorepass = ProvRunner.getAafPropsUtils().getKeystorePassProperty();
166 SSLSocketFactory socketFactory =
168 ? new SSLSocketFactory(keyStore, keystorepass)
169 : new SSLSocketFactory(keyStore, keystorepass, trustStore);
170 socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
171 sch = new Scheme("https", 443, socketFactory);
173 PlainSocketFactory socketFactory = new PlainSocketFactory();
174 sch = new Scheme("http", 80, socketFactory);
176 hc.getConnectionManager().getSchemeRegistry().register(sch);
178 setSynchTimer(ProvRunner.getProvProperties().getProperty(
179 "org.onap.dmaap.datarouter.provserver.sync_interval", "5000"));
180 } catch (Exception e) {
181 logger.warn("PROV5005: Problem starting the synchronizer: " + e);
185 private void setSynchTimer(String strInterval) {
186 // Run once every 5 seconds to check DNS, etc.
189 interval = Long.parseLong(strInterval);
190 } catch (NumberFormatException e) {
193 rolex.scheduleAtFixedRate(this, 0L, interval);
197 * Get the singleton SynchronizerTask object.
199 * @return the SynchronizerTask
201 public static synchronized SynchronizerTask getSynchronizer() {
202 if (synctask == null) {
203 synctask = new SynchronizerTask();
209 * What is the podState of this POD?.
211 * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
213 public int getPodState() {
218 * Is this the active POD?.
220 * @return true if we are active (the master), false otherwise
222 public boolean isActive() {
223 return podState == ACTIVE_POD;
227 * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
228 * should re-synchronize with the master.
230 public void doFetch() {
235 * Runs once a minute in order to <ol>
236 * <li>lookup DNS names,</li>
237 * <li>determine the podState of this POD,</li>
238 * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
239 * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
245 podState = lookupState();
246 if (podState == STANDBY_POD) {
247 // Only copy provisioning data FROM the active server TO the standby
248 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
249 syncProvisioningData();
250 logger.info("PROV5013: Sync completed.");
251 nextsynctime = System.currentTimeMillis() + ONE_HOUR;
254 // Don't do fetches on non-standby PODs
258 // Fetch DR logs as needed - server to server
259 LogfileLoader lfl = LogfileLoader.getLoader();
261 // Only fetch new logs if the loader is waiting for them.
262 logger.trace("Checking for logs to replicate...");
263 RLEBitSet local = lfl.getBitSet();
264 RLEBitSet remote = readRemoteLoglist();
265 remote.andNot(local);
266 if (!remote.isEmpty()) {
267 logger.debug(" Replicating logs: " + remote);
268 replicateDataRouterLogs(remote);
271 } catch (Exception e) {
272 logger.warn("PROV0020: Caught exception in SynchronizerTask: " + e);
276 private void syncProvisioningData() {
277 logger.debug("Initiating a sync...");
278 JSONObject jo = readProvisioningJson();
281 syncFeeds(jo.getJSONArray("feeds"));
282 syncSubs(jo.getJSONArray("subscriptions"));
283 syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
284 syncParams(jo.getJSONObject("parameters"));
285 // The following will not be present in a version=1.0 provfeed
286 JSONArray ja = jo.optJSONArray("ingress");
288 syncIngressRoutes(ja);
290 JSONObject j2 = jo.optJSONObject("egress");
292 syncEgressRoutes(j2);
294 ja = jo.optJSONArray("routing");
296 syncNetworkRoutes(ja);
302 * This method is used to lookup the CNAME that points to the active server.
303 * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
305 * @return the current podState
307 public int lookupState() {
308 int newPodState = UNKNOWN_POD;
310 InetAddress myaddr = InetAddress.getLocalHost();
311 if (logger.isTraceEnabled()) {
312 logger.trace("My address: " + myaddr);
314 String thisPod = myaddr.getHostName();
315 Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
316 if (pods.contains(thisPod)) {
317 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
318 newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
319 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
320 logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
321 nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
324 logger.warn("PROV5003: My name (" + thisPod + ") is missing from the list of provisioning servers.");
326 } catch (UnknownHostException e) {
327 logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
330 if (newPodState != podState) {
331 logger.info(String.format("PROV5001: Server podState changed from %s to %s",
332 stnames[podState], stnames[newPodState]));
338 * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
340 private void syncFeeds(JSONArray ja) {
341 Collection<Syncable> coll = new ArrayList<>();
342 for (int n = 0; n < ja.length(); n++) {
344 Feed feed = new Feed(ja.getJSONObject(n));
346 } catch (Exception e) {
347 logger.warn("PROV5004: Invalid object in feed: " + ja.optJSONObject(n), e);
350 if (sync(coll, Feed.getAllFeeds())) {
351 BaseServlet.provisioningDataChanged();
356 * Synchronize the Subscriptions in the JSONArray, with the Subscriptions in the DB.
358 private void syncSubs(JSONArray ja) {
359 Collection<Syncable> coll = new ArrayList<>();
360 for (int n = 0; n < ja.length(); n++) {
362 //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
363 JSONObject jsonObject = ja.getJSONObject(n);
364 jsonObject.put("sync", "true");
365 Subscription sub = new Subscription(jsonObject);
367 } catch (Exception e) {
368 logger.warn("PROV5004: Invalid object in subscription: " + ja.optJSONObject(n), e);
371 if (sync(coll, Subscription.getAllSubscriptions())) {
372 BaseServlet.provisioningDataChanged();
377 * Rally:US708115 - Synchronize the Groups in the JSONArray, with the Groups in the DB.
379 private void syncGroups(JSONArray ja) {
380 Collection<Syncable> coll = new ArrayList<>();
381 for (int n = 0; n < ja.length(); n++) {
383 Group group = new Group(ja.getJSONObject(n));
385 } catch (Exception e) {
386 logger.warn("PROV5004: Invalid object in group: " + ja.optJSONObject(n), e);
389 if (sync(coll, Group.getAllgroups())) {
390 BaseServlet.provisioningDataChanged();
396 * Synchronize the Parameters in the JSONObject, with the Parameters in the DB.
398 private void syncParams(JSONObject jo) {
399 Collection<Syncable> coll = new ArrayList<>();
400 for (String k : jo.keySet()) {
403 val = jo.getString(k);
404 } catch (JSONException e) {
405 logger.warn("PROV5004: Invalid object in parameters: " + jo.optJSONObject(k), e);
407 val = "" + jo.getInt(k);
408 } catch (JSONException e1) {
409 logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
410 JSONArray ja = jo.getJSONArray(k);
411 for (int i = 0; i < ja.length(); i++) {
415 val += ja.getString(i);
419 coll.add(new Parameters(k, val));
421 if (sync(coll, Parameters.getParameterCollection())) {
422 BaseServlet.provisioningDataChanged();
423 BaseServlet.provisioningParametersChanged();
427 private void syncIngressRoutes(JSONArray ja) {
428 Collection<Syncable> coll = new ArrayList<>();
429 for (int n = 0; n < ja.length(); n++) {
431 IngressRoute in = new IngressRoute(ja.getJSONObject(n));
433 } catch (NumberFormatException e) {
434 logger.warn("PROV5004: Invalid object in ingress routes: " + ja.optJSONObject(n));
437 if (sync(coll, IngressRoute.getAllIngressRoutes())) {
438 BaseServlet.provisioningDataChanged();
442 private void syncEgressRoutes(JSONObject jo) {
443 Collection<Syncable> coll = new ArrayList<>();
444 for (String key : jo.keySet()) {
446 int sub = Integer.parseInt(key);
447 String node = jo.getString(key);
448 EgressRoute er = new EgressRoute(sub, node);
450 } catch (NumberFormatException e) {
451 logger.warn("PROV5004: Invalid subid in egress routes: " + key, e);
452 } catch (IllegalArgumentException e) {
453 logger.warn("PROV5004: Invalid node name in egress routes: " + key, e);
456 if (sync(coll, EgressRoute.getAllEgressRoutes())) {
457 BaseServlet.provisioningDataChanged();
461 private void syncNetworkRoutes(JSONArray ja) {
462 Collection<Syncable> coll = new ArrayList<>();
463 for (int n = 0; n < ja.length(); n++) {
465 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
467 } catch (JSONException e) {
468 logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
471 if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
472 BaseServlet.provisioningDataChanged();
476 private boolean sync(Collection<? extends Syncable> newc, Collection<? extends Syncable> oldc) {
477 boolean changes = false;
479 Map<String, Syncable> newmap = getMap(newc);
480 Map<String, Syncable> oldmap = getMap(oldc);
481 Set<String> union = new TreeSet<>(newmap.keySet());
482 union.addAll(oldmap.keySet());
483 for (String n : union) {
484 Syncable newobj = newmap.get(n);
485 Syncable oldobj = oldmap.get(n);
486 if (oldobj == null) {
487 try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
488 changes = insertRecord(conn, newobj);
490 } else if (newobj == null) {
491 try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
492 changes = deleteRecord(conn, oldobj);
494 } else if (!newobj.equals(oldobj)) {
495 try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
496 changes = updateRecord(conn, newobj, oldobj);
500 } catch (SQLException e) {
501 logger.warn("PROV5009: problem during sync, exception: " + e);
506 private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
507 if (logger.isDebugEnabled()) {
508 logger.debug(" Updating record: " + newobj);
510 boolean changes = newobj.doUpdate(conn);
511 checkChangeOwner(newobj, oldobj);
516 private boolean deleteRecord(Connection conn, Syncable oldobj) {
517 if (logger.isDebugEnabled()) {
518 logger.debug(" Deleting record: " + oldobj);
520 return oldobj.doDelete(conn);
523 private boolean insertRecord(Connection conn, Syncable newobj) {
524 if (logger.isDebugEnabled()) {
525 logger.debug(" Inserting record: " + newobj);
527 return newobj.doInsert(conn);
530 private Map<String, Syncable> getMap(Collection<? extends Syncable> coll) {
531 Map<String, Syncable> map = new HashMap<>();
532 for (Syncable v : coll) {
533 map.put(v.getKey(), v);
539 * Change owner of FEED/SUBSCRIPTION.
540 * Rally US708115 Change Ownership of FEED - 1610
542 private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
543 if (newobj instanceof Feed) {
544 Feed oldfeed = (Feed) oldobj;
545 Feed newfeed = (Feed) newobj;
547 if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
548 logger.info("PROV5013 - Previous publisher: "
549 + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
550 oldfeed.setPublisher(newfeed.getPublisher());
551 oldfeed.changeOwnerShip();
553 } else if (newobj instanceof Subscription) {
554 Subscription oldsub = (Subscription) oldobj;
555 Subscription newsub = (Subscription) newobj;
557 if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
558 logger.info("PROV5013 - Previous subscriber: "
559 + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
560 oldsub.setSubscriber(newsub.getSubscriber());
561 oldsub.changeOwnerShip();
568 * Issue a GET on the peer POD's /internal/prov/ URL to get a copy of its provisioning data.
570 * @return the provisioning data (as a JONObject)
572 private synchronized JSONObject readProvisioningJson() {
573 String url = URLUtilities.generatePeerProvURL();
574 HttpGet get = new HttpGet(url);
576 HttpResponse response = httpclient.execute(get);
577 int code = response.getStatusLine().getStatusCode();
578 if (code != HttpServletResponse.SC_OK) {
579 logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
582 HttpEntity entity = response.getEntity();
583 String ctype = entity.getContentType().getValue().trim();
584 if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
585 && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
586 logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
589 return new JSONObject(new JSONTokener(entity.getContent()));
590 } catch (Exception e) {
591 logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
594 get.releaseConnection();
599 * Issue a GET on the peer POD's /internal/drlogs/ URL to get an RELBitSet representing the log records available in
600 * the remote database.
604 public RLEBitSet readRemoteLoglist() {
605 RLEBitSet bs = new RLEBitSet();
606 String url = URLUtilities.generatePeerLogsURL();
608 //Fixing if only one Prov is configured, not to give exception to fill logs, return empty bitset.
609 if ("".equals(url)) {
614 HttpGet get = new HttpGet(url);
615 try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
616 HttpResponse response = httpclient.execute(get);
617 int code = response.getStatusLine().getStatusCode();
618 if (code != HttpServletResponse.SC_OK) {
619 logger.warn("PROV5010: readRemoteLoglist failed, bad error code: " + code);
622 HttpEntity entity = response.getEntity();
623 String ctype = entity.getContentType().getValue().trim();
624 if (!TEXT_CT.equals(ctype)) {
625 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
628 InputStream is = entity.getContent();
630 while ((ch = is.read()) >= 0) {
633 bs.set(bos.toString());
635 } catch (Exception e) {
636 logger.warn("PROV5012: readRemoteLoglist failed, exception: " + e);
639 get.releaseConnection();
645 * Issue a POST on the peer POD's /internal/drlogs/ URL to fetch log records available in the remote database that
646 * we wish to copy to the local database.
648 * @param bs the bitset (an RELBitSet) of log records to fetch
650 public void replicateDataRouterLogs(RLEBitSet bs) {
651 String url = URLUtilities.generatePeerLogsURL();
652 HttpPost post = new HttpPost(url);
654 String str = bs.toString();
655 HttpEntity body = new ByteArrayEntity(str.getBytes(), ContentType.create(TEXT_CT));
656 post.setEntity(body);
657 if (logger.isDebugEnabled()) {
658 logger.debug("Requesting records: " + str);
661 HttpResponse response = httpclient.execute(post);
662 int code = response.getStatusLine().getStatusCode();
663 if (code != HttpServletResponse.SC_OK) {
664 logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
667 HttpEntity entity = response.getEntity();
668 String ctype = entity.getContentType().getValue().trim();
669 if (!TEXT_CT.equals(ctype)) {
670 logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
674 String spoolname = "" + System.currentTimeMillis();
675 Path tmppath = Paths.get(spooldir, spoolname);
676 Path donepath = Paths.get(spooldir, "IN." + spoolname);
677 Files.copy(entity.getContent(), Paths.get(spooldir, spoolname), StandardCopyOption.REPLACE_EXISTING);
678 Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
679 logger.info("Approximately " + bs.cardinality() + " records replicated.");
680 } catch (Exception e) {
681 logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
683 post.releaseConnection();