1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package com.att.research.datarouter.node;
\r
30 import org.apache.log4j.Logger;
\r
32 import com.att.eelf.configuration.EELFLogger;
\r
33 import com.att.eelf.configuration.EELFManager;
\r
34 import com.att.research.datarouter.node.eelf.EelfMsgs;
\r
38 * Maintain the configuration of a Data Router node
\r
40 * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention subsystems to access configuration information. (Log4J has its own configuration mechanism).
\r
42 * There are two basic sets of configuration data. The
\r
43 * static local configuration data, stored in a local configuration file (created
\r
44 * as part of installation by SWM), and the dynamic global
\r
45 * configuration data fetched from the data router provisioning server.
\r
47 public class NodeConfigManager implements DeliveryQueueHelper {
\r
48 private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeConfigManager");
\r
49 private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeConfigManager");
\r
50 private static NodeConfigManager base = new NodeConfigManager();
\r
52 private Timer timer = new Timer("Node Configuration Timer", true);
\r
53 private long maxfailuretimer;
\r
54 private long initfailuretimer;
\r
55 private long expirationtimer;
\r
56 private double failurebackoff;
\r
57 private long fairtimelimit;
\r
58 private int fairfilelimit;
\r
59 private double fdpstart;
\r
60 private double fdpstop;
\r
61 private int deliverythreads;
\r
62 private String provurl;
\r
63 private String provhost;
\r
64 private IsFrom provcheck;
\r
66 private int svcport;
\r
68 private String spooldir;
\r
69 private String logdir;
\r
70 private long logretention;
\r
71 private String redirfile;
\r
72 private String kstype;
\r
73 private String ksfile;
\r
74 private String kspass;
\r
75 private String kpass;
\r
76 private String tstype;
\r
77 private String tsfile;
\r
78 private String tspass;
\r
79 private String myname;
\r
80 private RedirManager rdmgr;
\r
81 private RateLimitedOperation pfetcher;
\r
82 private NodeConfig config;
\r
83 private File quiesce;
\r
84 private PublishId pid;
\r
86 private TaskList configtasks = new TaskList();
\r
87 private String eventlogurl;
\r
88 private String eventlogprefix;
\r
89 private String eventlogsuffix;
\r
90 private String eventloginterval;
\r
91 private boolean followredirects;
\r
95 * Get the default node configuration manager
\r
97 public static NodeConfigManager getInstance() {
\r
101 * Initialize the configuration of a Data Router node
\r
103 private NodeConfigManager() {
\r
104 Properties p = new Properties();
\r
106 p.load(new FileInputStream(System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties")));
\r
107 } catch (Exception e) {
\r
109 NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
\r
110 eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR);
\r
111 logger.error("NODE0301 Unable to load local configuration file " + System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"), e);
\r
113 provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov");
\r
115 provhost = (new URL(provurl)).getHost();
\r
116 } catch (Exception e) {
\r
117 NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
\r
118 eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl);
\r
119 logger.error("NODE0302 Bad provisioning server URL " + provurl);
\r
122 logger.info("NODE0303 Provisioning server is " + provhost);
\r
123 eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs");
\r
124 provcheck = new IsFrom(provhost);
\r
125 gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080"));
\r
126 svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443"));
\r
127 port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443"));
\r
128 long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000"));
\r
129 long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000"));
\r
130 spooldir = p.getProperty("SpoolDir", "spool");
\r
131 File fdir = new File(spooldir + "/f");
\r
133 for (File junk: fdir.listFiles()) {
\r
134 if (junk.isFile()) {
\r
138 logdir = p.getProperty("LogDir", "logs");
\r
139 (new File(logdir)).mkdirs();
\r
140 logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L;
\r
141 eventlogprefix = logdir + "/events";
\r
142 eventlogsuffix = ".log";
\r
143 String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat");
\r
144 kstype = p.getProperty("KeyStoreType", "jks");
\r
145 ksfile = p.getProperty("KeyStoreFile", "etc/keystore");
\r
146 kspass = p.getProperty("KeyStorePassword", "changeme");
\r
147 kpass = p.getProperty("KeyPassword", "changeme");
\r
148 tstype = p.getProperty("TrustStoreType", "jks");
\r
149 tsfile = p.getProperty("TrustStoreFile");
\r
150 tspass = p.getProperty("TrustStorePassword", "changeme");
\r
151 if (tsfile != null && tsfile.length() > 0) {
\r
152 System.setProperty("javax.net.ssl.trustStoreType", tstype);
\r
153 System.setProperty("javax.net.ssl.trustStore", tsfile);
\r
154 System.setProperty("javax.net.ssl.trustStorePassword", tspass);
\r
156 nak = p.getProperty("NodeAuthKey", "Node123!");
\r
157 quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN"));
\r
158 myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);
\r
159 if (myname == null) {
\r
160 NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
\r
161 eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);
\r
162 logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);
\r
165 logger.info("NODE0304 My certificate says my name is " + myname);
\r
166 pid = new PublishId(myname);
\r
167 rdmgr = new RedirManager(redirfile, minrsinterval, timer);
\r
168 pfetcher = new RateLimitedOperation(minpfinterval, timer) {
\r
169 public void run() {
\r
173 logger.info("NODE0305 Attempting to fetch configuration at " + provurl);
\r
174 pfetcher.request();
\r
176 private void localconfig() {
\r
177 followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
\r
178 eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m");
\r
179 initfailuretimer = 10000;
\r
180 maxfailuretimer = 3600000;
\r
181 expirationtimer = 86400000;
\r
182 failurebackoff = 2.0;
\r
183 deliverythreads = 40;
\r
184 fairfilelimit = 100;
\r
185 fairtimelimit = 60000;
\r
188 try { initfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) {}
\r
189 try { maxfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) {}
\r
190 try { expirationtimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) {}
\r
191 try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) {}
\r
192 try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) {}
\r
193 try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) {}
\r
194 try { fairtimelimit = (long)(Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) {}
\r
195 try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) {}
\r
196 try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) {}
\r
197 if (fdpstart < 0.01) {
\r
200 if (fdpstart > 0.5) {
\r
203 if (fdpstop < fdpstart) {
\r
204 fdpstop = fdpstart;
\r
206 if (fdpstop > 0.5) {
\r
210 private void fetchconfig() {
\r
212 System.out.println("provurl:: "+provurl);
\r
213 Reader r = new InputStreamReader((new URL(provurl)).openStream());
\r
214 config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);
\r
216 configtasks.startRun();
\r
218 while ((rr = configtasks.next()) != null) {
\r
221 } catch (Exception e) {
\r
224 } catch (Exception e) {
\r
225 e.printStackTrace();
\r
226 NodeUtils.setIpAndFqdnForEelf("fetchconfigs");
\r
227 eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
\r
228 logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e);
\r
229 pfetcher.request();
\r
233 * Process a gofetch request from a particular IP address. If the
\r
234 * IP address is not an IP address we would go to to fetch the
\r
235 * provisioning data, ignore the request. If the data has been
\r
236 * fetched very recently (default 10 seconds), wait a while before fetching again.
\r
238 public synchronized void gofetch(String remoteaddr) {
\r
239 if (provcheck.isFrom(remoteaddr)) {
\r
240 logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr);
\r
241 pfetcher.request();
\r
243 logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr);
\r
249 public boolean isConfigured() {
\r
250 return(config != null);
\r
255 public boolean isShutdown() {
\r
256 return(quiesce.exists());
\r
259 * Given a routing string, get the targets.
\r
260 * @param routing Target string
\r
261 * @return array of targets
\r
263 public Target[] parseRouting(String routing) {
\r
264 return(config.parseRouting(routing));
\r
267 * Given a set of credentials and an IP address, is this request from another node?
\r
268 * @param credentials Credentials offered by the supposed node
\r
269 * @param ip IP address the request came from
\r
270 * @return If the credentials and IP address are recognized, true, otherwise false.
\r
272 public boolean isAnotherNode(String credentials, String ip) {
\r
273 return(config.isAnotherNode(credentials, ip));
\r
276 * Check whether publication is allowed.
\r
277 * @param feedid The ID of the feed being requested
\r
278 * @param credentials The offered credentials
\r
279 * @param ip The requesting IP address
\r
280 * @return True if the IP and credentials are valid for the specified feed.
\r
282 public String isPublishPermitted(String feedid, String credentials, String ip) {
\r
283 return(config.isPublishPermitted(feedid, credentials, ip));
\r
286 * Check who the user is given the feed ID and the offered credentials.
\r
287 * @param feedid The ID of the feed specified
\r
288 * @param credentials The offered credentials
\r
289 * @return Null if the credentials are invalid or the user if they are valid.
\r
291 public String getAuthUser(String feedid, String credentials) {
\r
292 return(config.getAuthUser(feedid, credentials));
\r
295 * Check if the publish request should be sent to another node based on the feedid, user, and source IP address.
\r
296 * @param feedid The ID of the feed specified
\r
297 * @param user The publishing user
\r
298 * @param ip The IP address of the publish endpoint
\r
299 * @return Null if the request should be accepted or the correct hostname if it should be sent to another node.
\r
301 public String getIngressNode(String feedid, String user, String ip) {
\r
302 return(config.getIngressNode(feedid, user, ip));
\r
305 * Get a provisioned configuration parameter (from the provisioning server configuration)
\r
306 * @param name The name of the parameter
\r
307 * @return The value of the parameter or null if it is not defined.
\r
309 public String getProvParam(String name) {
\r
310 return(config.getProvParam(name));
\r
313 * Get a provisioned configuration parameter (from the provisioning server configuration)
\r
314 * @param name The name of the parameter
\r
315 * @param deflt The value to use if the parameter is not defined
\r
316 * @return The value of the parameter or deflt if it is not defined.
\r
318 public String getProvParam(String name, String deflt) {
\r
319 name = config.getProvParam(name);
\r
320 if (name == null) {
\r
326 * Generate a publish ID
\r
328 public String getPublishId() {
\r
329 return(pid.next());
\r
332 * Get all the outbound spooling destinations.
\r
333 * This will include both subscriptions and nodes.
\r
335 public DestInfo[] getAllDests() {
\r
336 return(config.getAllDests());
\r
339 * Register a task to run whenever the configuration changes
\r
341 public void registerConfigTask(Runnable task) {
\r
342 configtasks.addTask(task);
\r
345 * Deregister a task to run whenever the configuration changes
\r
347 public void deregisterConfigTask(Runnable task) {
\r
348 configtasks.removeTask(task);
\r
351 * Get the URL to deliver a message to.
\r
352 * @param destinfo The destination information
\r
353 * @param fileid The file ID
\r
354 * @return The URL to deliver to
\r
356 public String getDestURL(DestInfo destinfo, String fileid) {
\r
357 String subid = destinfo.getSubId();
\r
358 String purl = destinfo.getURL();
\r
359 if (followredirects && subid != null) {
\r
360 purl = rdmgr.lookup(subid, purl);
\r
362 return(purl + "/" + fileid);
\r
365 * Is a destination redirected?
\r
367 public boolean isDestRedirected(DestInfo destinfo) {
\r
368 return(followredirects && rdmgr.isRedirected(destinfo.getSubId()));
\r
371 * Set up redirection on receipt of a 3XX from a target URL
\r
373 public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {
\r
374 fileid = "/" + fileid;
\r
375 String subid = destinfo.getSubId();
\r
376 String purl = destinfo.getURL();
\r
377 if (followredirects && subid != null && redirto.endsWith(fileid)) {
\r
378 redirto = redirto.substring(0, redirto.length() - fileid.length());
\r
379 if (!redirto.equals(purl)) {
\r
380 rdmgr.redirect(subid, purl, redirto);
\r
387 * Handle unreachable target URL
\r
389 public void handleUnreachable(DestInfo destinfo) {
\r
390 String subid = destinfo.getSubId();
\r
391 if (followredirects && subid != null) {
\r
392 rdmgr.forget(subid);
\r
396 * Get the timeout before retrying after an initial delivery failure
\r
398 public long getInitFailureTimer() {
\r
399 return(initfailuretimer);
\r
402 * Get the maximum timeout between delivery attempts
\r
404 public long getMaxFailureTimer() {
\r
405 return(maxfailuretimer);
\r
408 * Get the ratio between consecutive delivery attempts
\r
410 public double getFailureBackoff() {
\r
411 return(failurebackoff);
\r
414 * Get the expiration timer for deliveries
\r
416 public long getExpirationTimer() {
\r
417 return(expirationtimer);
\r
420 * Get the maximum number of file delivery attempts before checking
\r
421 * if another queue has work to be performed.
\r
423 public int getFairFileLimit() {
\r
424 return(fairfilelimit);
\r
427 * Get the maximum amount of time spent delivering files before
\r
428 * checking if another queue has work to be performed.
\r
430 public long getFairTimeLimit() {
\r
431 return(fairtimelimit);
\r
434 * Get the targets for a feed
\r
435 * @param feedid The feed ID
\r
436 * @return The targets this feed should be delivered to
\r
438 public Target[] getTargets(String feedid) {
\r
439 return(config.getTargets(feedid));
\r
442 * Get the spool directory for temporary files
\r
444 public String getSpoolDir() {
\r
445 return(spooldir + "/f");
\r
448 * Get the base directory for spool directories
\r
450 public String getSpoolBase() {
\r
454 * Get the key store type
\r
456 public String getKSType() {
\r
460 * Get the key store file
\r
462 public String getKSFile() {
\r
466 * Get the key store password
\r
468 public String getKSPass() {
\r
472 * Get the key password
\r
474 public String getKPass() {
\r
478 * Get the http port
\r
480 public int getHttpPort() {
\r
484 * Get the https port
\r
486 public int getHttpsPort() {
\r
490 * Get the externally visible https port
\r
492 public int getExtHttpsPort() {
\r
496 * Get the external name of this machine
\r
498 public String getMyName() {
\r
502 * Get the number of threads to use for delivery
\r
504 public int getDeliveryThreads() {
\r
505 return(deliverythreads);
\r
508 * Get the URL for uploading the event log data
\r
510 public String getEventLogUrl() {
\r
511 return(eventlogurl);
\r
514 * Get the prefix for the names of event log files
\r
516 public String getEventLogPrefix() {
\r
517 return(eventlogprefix);
\r
520 * Get the suffix for the names of the event log files
\r
522 public String getEventLogSuffix() {
\r
523 return(eventlogsuffix);
\r
526 * Get the interval between event log file rollovers
\r
528 public String getEventLogInterval() {
\r
529 return(eventloginterval);
\r
532 * Should I follow redirects from subscribers?
\r
534 public boolean isFollowRedirects() {
\r
535 return(followredirects);
\r
538 * Get the directory where the event and node log files live
\r
540 public String getLogDir() {
\r
544 * How long do I keep log files (in milliseconds)
\r
546 public long getLogRetention() {
\r
547 return(logretention);
\r
552 public Timer getTimer() {
\r
556 * Get the feed ID for a subscription
\r
557 * @param subid The subscription ID
\r
558 * @return The feed ID
\r
560 public String getFeedId(String subid) {
\r
561 return(config.getFeedId(subid));
\r
564 * Get the authorization string this node uses
\r
565 * @return The Authorization string for this node
\r
567 public String getMyAuth() {
\r
568 return(config.getMyAuth());
\r
571 * Get the fraction of free spool disk space where we start throwing away undelivered files. This is FREE_DISK_RED_PERCENT / 100.0. Default is 0.05. Limited by 0.01 <= FreeDiskStart <= 0.5.
\r
573 public double getFreeDiskStart() {
\r
577 * Get the fraction of free spool disk space where we stop throwing away undelivered files. This is FREE_DISK_YELLOW_PERCENT / 100.0. Default is 0.2. Limited by FreeDiskStart <= FreeDiskStop <= 0.5.
\r
579 public double getFreeDiskStop() {
\r
583 * Get the spool directory for a subscription
\r
585 public String getSpoolDir(String subid, String remoteaddr) {
\r
586 if (provcheck.isFrom(remoteaddr)) {
\r
587 String sdir = config.getSpoolDir(subid);
\r
588 if (sdir != null) {
\r
589 logger.info("NODE0310 Received subscription reset request for subscription " + subid + " from provisioning server " + remoteaddr);
\r
591 logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid + " from provisioning server " + remoteaddr);
\r
595 logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);
\r