1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
27 import static java.lang.System.exit;
28 import static java.lang.System.getProperty;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
33 import java.io.FileInputStream;
34 import java.io.IOException;
35 import java.io.InputStreamReader;
36 import java.io.Reader;
38 import java.nio.file.Files;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.Objects;
42 import java.util.Properties;
43 import java.util.Timer;
44 import org.onap.dmaap.datarouter.node.config.NodeConfig;
45 import org.onap.dmaap.datarouter.node.config.ProvData;
46 import org.onap.dmaap.datarouter.node.delivery.DeliveryQueueHelper;
47 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
48 import org.onap.dmaap.datarouter.node.utils.NodeTlsManager;
49 import org.onap.dmaap.datarouter.node.utils.NodeUtils;
53 * Maintain the configuration of a Data Router node
55 * <p>The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention
56 * subsystems to access configuration information.
58 * <p>There are two basic sets of configuration data. The static local configuration data, stored in a local
59 * configuration file (created as part of installation by SWM), and the dynamic global configuration data fetched from
60 * the data router provisioning server.
62 public class NodeConfigManager implements DeliveryQueueHelper {
64 private static final String NODE_CONFIG_MANAGER = "NodeConfigManager";
65 private static final EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeConfigManager.class);
66 private long maxfailuretimer;
67 private long initfailuretimer;
68 private long waitForFileProcessFailureTimer;
69 private long expirationtimer;
70 private double failurebackoff;
71 private long fairtimelimit;
72 private int fairfilelimit;
73 private double fdpstart;
74 private double fdpstop;
75 private int deliverythreads;
76 private final String provurl;
77 private String provhost;
78 private final int intHttpPort;
79 private final int intHttpsPort;
80 private final int extHttpsPort;
81 private final boolean tlsEnabled;
82 private String myname;
83 private final String nak;
84 private final File quiesce;
85 private final String spooldir;
86 private final String logdir;
87 private final long logretention;
88 private final String eventlogurl;
89 private final String eventlogprefix;
90 private final String eventlogsuffix;
91 private String eventloginterval;
92 private boolean followredirects;
93 private final TaskList configtasks = new TaskList();
94 private final PublishId publishId;
95 private final IsFrom provcheck;
96 private final RedirManager rdmgr;
97 private final Timer timer = new Timer("Node Configuration Timer", true);
98 private final RateLimitedOperation pfetcher;
99 private static NodeConfigManager base;
100 private static NodeTlsManager nodeTlsManager;
101 private NodeConfig nodeConfig;
102 private static Properties drNodeProperties;
104 public static Properties getDrNodeProperties() {
105 if (drNodeProperties == null) {
106 try (FileInputStream props = new FileInputStream(getProperty(
107 "org.onap.dmaap.datarouter.node.properties",
108 "/opt/app/datartr/etc/node.properties"))) {
109 drNodeProperties = new Properties();
110 drNodeProperties.load(props);
111 } catch (IOException e) {
112 eelfLogger.error("Failed to load NODE properties: " + e.getMessage(), e);
116 return drNodeProperties;
119 * Initialize the configuration of a Data Router node.
121 private NodeConfigManager() {
122 provurl = getDrNodeProperties().getProperty("ProvisioningURL", "http://dmaap-dr-prov:8080/internal/prov");
124 provhost = (new URL(provurl)).getHost();
125 } catch (Exception e) {
126 NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
127 eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, e, provurl);
130 eelfLogger.debug("NODE0303 Provisioning server is at: " + provhost);
131 provcheck = new IsFrom(provhost);
132 tlsEnabled = Boolean.parseBoolean(getDrNodeProperties().getProperty("TlsEnabled", "true"));
133 if (isTlsEnabled()) {
135 nodeTlsManager = new NodeTlsManager(getDrNodeProperties());
136 myname = nodeTlsManager.getMyNameFromCertificate();
137 if (myname == null) {
138 NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
139 eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, nodeTlsManager.getKeyStorefile());
140 eelfLogger.error("NODE0309 Unable to fetch canonical name from keystore file {}", nodeTlsManager.getKeyStorefile());
143 eelfLogger.debug("NODE0304 My certificate says my name is {}", myname);
144 } catch (Exception e) {
145 eelfLogger.error("NODE0314 Failed to set up TLS config. Exiting", e);
149 myname = "dmaap-dr-node";
150 eventlogurl = getDrNodeProperties().getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs");
151 intHttpPort = Integer.parseInt(getDrNodeProperties().getProperty("IntHttpPort", "80"));
152 intHttpsPort = Integer.parseInt(getDrNodeProperties().getProperty("IntHttpsPort", "443"));
153 extHttpsPort = Integer.parseInt(getDrNodeProperties().getProperty("ExtHttpsPort", "443"));
154 spooldir = getDrNodeProperties().getProperty("SpoolDir", "spool");
156 File fdir = new File(spooldir + "/f");
158 for (File junk : Objects.requireNonNull(fdir.listFiles())) {
160 Files.deleteIfExists(junk.toPath());
161 } catch (IOException e) {
162 eelfLogger.error("NODE0313 Failed to clear junk files from " + fdir.getPath(), e);
165 logdir = getDrNodeProperties().getProperty("LogDir", "logs");
166 (new File(logdir)).mkdirs();
167 logretention = Long.parseLong(getDrNodeProperties().getProperty("LogRetention", "30")) * 86400000L;
168 eventlogprefix = logdir + "/events";
169 eventlogsuffix = ".log";
170 String redirfile = getDrNodeProperties().getProperty("RedirectionFile", "etc/redirections.dat");
171 publishId = new PublishId(myname);
172 nak = getDrNodeProperties().getProperty("NodeAuthKey", "Node123!");
173 quiesce = new File(getDrNodeProperties().getProperty("QuiesceFile", "etc/SHUTDOWN"));
174 rdmgr = new RedirManager(redirfile,
175 Long.parseLong(getDrNodeProperties().getProperty("MinRedirSaveInterval", "10000")), timer);
176 pfetcher = new RateLimitedOperation(
177 Long.parseLong(getDrNodeProperties().getProperty("MinProvFetchInterval", "10000")), timer) {
179 fetchNodeConfigFromProv();
182 eelfLogger.debug("NODE0305 Attempting to fetch configuration at " + provurl);
187 * Get the default node configuration manager.
189 public static NodeConfigManager getInstance() {
191 base = new NodeConfigManager();
196 private void localconfig() {
197 followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
198 eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
199 initfailuretimer = 10000;
200 waitForFileProcessFailureTimer = 600000;
201 maxfailuretimer = 3600000;
202 expirationtimer = 86400000;
203 failurebackoff = 2.0;
204 deliverythreads = 40;
206 fairtimelimit = 60000;
210 initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
211 } catch (Exception e) {
212 eelfLogger.trace("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e);
215 waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL"))
217 } catch (Exception e) {
218 eelfLogger.trace("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e);
221 maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
222 } catch (Exception e) {
223 eelfLogger.trace("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e);
226 expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000);
227 } catch (Exception e) {
228 eelfLogger.trace("Error parsing DELIVERY_MAX_AGE", e);
231 failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO"));
232 } catch (Exception e) {
233 eelfLogger.trace("Error parsing DELIVERY_RETRY_RATIO", e);
236 deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS"));
237 } catch (Exception e) {
238 eelfLogger.trace("Error parsing DELIVERY_THREADS", e);
241 fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT"));
242 } catch (Exception e) {
243 eelfLogger.trace("Error parsing FAIR_FILE_LIMIT", e);
246 fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000);
247 } catch (Exception e) {
248 eelfLogger.trace("Error parsing FAIR_TIME_LIMIT", e);
251 fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0;
252 } catch (Exception e) {
253 eelfLogger.trace("Error parsing FREE_DISK_RED_PERCENT", e);
256 fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0;
257 } catch (Exception e) {
258 eelfLogger.trace("Error parsing FREE_DISK_YELLOW_PERCENT", e);
260 if (fdpstart < 0.01) {
263 if (fdpstart > 0.5) {
266 if (fdpstop < fdpstart) {
274 private void fetchNodeConfigFromProv() {
276 eelfLogger.debug("NodeConfigMan.fetchNodeConfigFromProv: provurl:: {}", provurl);
277 URL url = new URL(provurl);
278 Reader reader = new InputStreamReader(url.openStream());
279 nodeConfig = new NodeConfig(new ProvData(reader), myname, spooldir, extHttpsPort, nak);
281 configtasks.startRun();
283 } catch (Exception e) {
284 NodeUtils.setIpAndFqdnForEelf("fetchNodeConfigFromProv");
285 eelfLogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
286 eelfLogger.error("NODE0306 Configuration failed {} - try again later", e);
291 private void runTasks() {
293 while ((rr = configtasks.next()) != null) {
296 } catch (Exception e) {
297 eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
303 * Process a gofetch request from a particular IP address. If the IP address is not an IP address we would go to to
304 * fetch the provisioning data, ignore the request. If the data has been fetched very recently (default 10
305 * seconds), wait a while before fetching again.
307 synchronized void gofetch(String remoteAddr) {
308 if (provcheck.isReachable(remoteAddr)) {
309 eelfLogger.debug("NODE0307 Received configuration fetch request from provisioning server " + remoteAddr);
312 eelfLogger.debug("NODE0308 Received configuration fetch request from unexpected server " + remoteAddr);
319 public boolean isConfigured() {
320 return nodeConfig != null;
326 boolean isShutdown() {
327 return quiesce.exists();
331 * Given a routing string, get the targets.
333 * @param routing Target string
334 * @return array of targets
336 Target[] parseRouting(String routing) {
337 return nodeConfig.parseRouting(routing);
341 * Given a set of credentials and an IP address, is this request from another node.
343 * @param credentials Credentials offered by the supposed node
344 * @param ip IP address the request came from
345 * @return If the credentials and IP address are recognized, true, otherwise false.
347 boolean isAnotherNode(String credentials, String ip) {
348 return nodeConfig.isAnotherNode(credentials, ip);
352 * Check whether publication is allowed.
354 * @param feedid The ID of the feed being requested
355 * @param credentials The offered credentials
356 * @param ip The requesting IP address
357 * @return True if the IP and credentials are valid for the specified feed.
359 String isPublishPermitted(String feedid, String credentials, String ip) {
360 return nodeConfig.isPublishPermitted(feedid, credentials, ip);
364 * Check whether delete file is allowed.
366 * @param subId The ID of the subscription being requested
367 * @return True if the delete file is permitted for the subscriber.
369 boolean isDeletePermitted(String subId) {
370 return nodeConfig.isDeletePermitted(subId);
374 * Check who the user is given the feed ID and the offered credentials.
376 * @param feedid The ID of the feed specified
377 * @param credentials The offered credentials
378 * @return Null if the credentials are invalid or the user if they are valid.
380 String getAuthUser(String feedid, String credentials) {
381 return nodeConfig.getAuthUser(feedid, credentials);
385 * Check if the publish request should be sent to another node based on the feedid, user, and source IP address.
387 * @param feedid The ID of the feed specified
388 * @param user The publishing user
389 * @param ip The IP address of the publish endpoint
390 * @return Null if the request should be accepted or the correct hostname if it should be sent to another node.
392 String getIngressNode(String feedid, String user, String ip) {
393 return nodeConfig.getIngressNode(feedid, user, ip);
397 * Get a provisioned configuration parameter (from the provisioning server configuration).
399 * @param name The name of the parameter
400 * @return The value of the parameter or null if it is not defined.
402 private String getProvParam(String name) {
403 return nodeConfig.getProvParam(name);
407 * Get a provisioned configuration parameter (from the provisioning server configuration).
409 * @param name The name of the parameter
410 * @param defaultValue The value to use if the parameter is not defined
411 * @return The value of the parameter or deflt if it is not defined.
413 private String getProvParam(String name, String defaultValue) {
414 name = nodeConfig.getProvParam(name);
422 * Generate a publish ID.
424 public String getPublishId() {
425 return publishId.next();
429 * Get all the outbound spooling destinations. This will include both subscriptions and nodes.
431 public DestInfo[] getAllDests() {
432 return nodeConfig.getAllDests();
436 * Register a task to run whenever the configuration changes.
438 public void registerConfigTask(Runnable task) {
439 configtasks.addTask(task);
443 * Deregister a task to run whenever the configuration changes.
445 void deregisterConfigTask(Runnable task) {
446 configtasks.removeTask(task);
450 * Get the URL to deliver a message to.
452 * @param destinationInfo The destination information
453 * @param fileid The file ID
454 * @return The URL to deliver to
456 public String getDestURL(DestInfo destinationInfo, String fileid) {
457 String subid = destinationInfo.getSubId();
458 String purl = destinationInfo.getURL();
459 if (followredirects && subid != null) {
460 purl = rdmgr.lookup(subid, purl);
462 return (purl + "/" + fileid);
466 * Set up redirection on receipt of a 3XX from a target URL.
468 public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
469 fileid = "/" + fileid;
470 String subid = destinationInfo.getSubId();
471 String purl = destinationInfo.getURL();
472 if (followredirects && subid != null && redirto.endsWith(fileid)) {
473 redirto = redirto.substring(0, redirto.length() - fileid.length());
474 if (!redirto.equals(purl)) {
475 rdmgr.redirect(subid, purl, redirto);
483 * Handle unreachable target URL.
485 public void handleUnreachable(DestInfo destinationInfo) {
486 String subid = destinationInfo.getSubId();
487 if (followredirects && subid != null) {
493 * Get the timeout before retrying after an initial delivery failure.
495 public long getInitFailureTimer() {
496 return initfailuretimer;
500 * Get the timeout before retrying after delivery and wait for file processing.
502 public long getWaitForFileProcessFailureTimer() {
503 return waitForFileProcessFailureTimer;
507 * Get the maximum timeout between delivery attempts.
509 public long getMaxFailureTimer() {
510 return maxfailuretimer;
514 * Get the ratio between consecutive delivery attempts.
516 public double getFailureBackoff() {
517 return failurebackoff;
521 * Get the expiration timer for deliveries.
523 public long getExpirationTimer() {
524 return expirationtimer;
528 * Get the maximum number of file delivery attempts before checking if another queue has work to be performed.
530 public int getFairFileLimit() {
531 return fairfilelimit;
535 * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
537 public long getFairTimeLimit() {
538 return fairtimelimit;
542 * Get the targets for a feed.
544 * @param feedid The feed ID
545 * @return The targets this feed should be delivered to
547 Target[] getTargets(String feedid) {
548 return nodeConfig.getTargets(feedid);
552 * Get the spool directory for temporary files.
554 String getSpoolDir() {
555 return spooldir + "/f";
559 * Get the spool directory for a subscription.
561 String getSpoolDir(String subid, String remoteaddr) {
562 if (provcheck.isFrom(remoteaddr)) {
563 String sdir = nodeConfig.getSpoolDir(subid);
565 eelfLogger.debug("NODE0310 Received subscription reset request for subscription " + subid
566 + " from provisioning server " + remoteaddr);
568 eelfLogger.debug("NODE0311 Received subscription reset request for unknown subscription " + subid
569 + " from provisioning server " + remoteaddr);
573 eelfLogger.debug("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);
579 * Get the base directory for spool directories.
581 public String getSpoolBase() {
593 * Get the https port.
600 * Get the externally visible https port.
602 int getExtHttpsPort() {
607 * Get the external name of this machine.
609 public String getMyName() {
614 * Get the number of threads to use for delivery.
616 public int getDeliveryThreads() {
617 return deliverythreads;
621 * Get the URL for uploading the event log data.
623 public String getEventLogUrl() {
628 * Get the prefix for the names of event log files.
630 public String getEventLogPrefix() {
631 return eventlogprefix;
635 * Get the suffix for the names of the event log files.
637 public String getEventLogSuffix() {
638 return eventlogsuffix;
642 * Get the interval between event log file rollovers.
644 public String getEventLogInterval() {
645 return eventloginterval;
649 * Should I follow redirects from subscribers.
651 public boolean isFollowRedirects() {
652 return followredirects;
656 * Get the directory where the event and node log files live.
658 public String getLogDir() {
663 * How long do I keep log files (in milliseconds).
665 public long getLogRetention() {
672 public Timer getTimer() {
677 * Get the feed ID for a subscription.
679 * @param subid The subscription ID
680 * @return The feed ID
682 public String getFeedId(String subid) {
683 return nodeConfig.getFeedId(subid);
687 * Get the authorization string this node uses.
689 * @return The Authorization string for this node
691 public String getMyAuth() {
692 return nodeConfig.getMyAuth();
696 * Get the fraction of free spool disk space where we start throwing away undelivered files. This is
697 * FREE_DISK_RED_PERCENT / 100.0. Default is 0.05. Limited by 0.01 <= FreeDiskStart <= 0.5.
699 public double getFreeDiskStart() {
704 * Get the fraction of free spool disk space where we stop throwing away undelivered files. This is
705 * FREE_DISK_YELLOW_PERCENT / 100.0. Default is 0.2. Limited by FreeDiskStart <= FreeDiskStop <= 0.5.
707 public double getFreeDiskStop() {
711 protected boolean isTlsEnabled() {
715 public static NodeTlsManager getNodeTlsManager() {
716 return nodeTlsManager;
720 * Generate publish IDs.
722 static class PublishId {
724 private long nextuid;
725 private final String myname;
728 * Generate publish IDs for the specified name.
730 * @param myname Unique identifier for this publish ID generator (usually fqdn of server)
732 public PublishId(String myname) {
733 this.myname = myname;
737 * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log
738 * correlation purposes.
740 public synchronized String next() {
741 long now = System.currentTimeMillis();
746 return (now + "." + myname);
751 * Manage a list of tasks to be executed when an event occurs. This makes the following guarantees:
753 * <li>Tasks can be safely added and removed in the middle of a run.</li>
754 * <li>No task will be returned more than once during a run.</li>
755 * <li>No task will be returned when it is not, at that moment, in the list of tasks.</li>
756 * <li>At the moment when next() returns null, all tasks on the list have been returned during the run.</li>
757 * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is
761 static class TaskList {
763 private Iterator<Runnable> runlist;
764 private final HashSet<Runnable> tasks = new HashSet<>();
765 private HashSet<Runnable> togo;
766 private HashSet<Runnable> sofar;
767 private HashSet<Runnable> added;
768 private HashSet<Runnable> removed;
771 * Start executing the sequence of tasks.
773 synchronized void startRun() {
774 sofar = new HashSet<>();
775 added = new HashSet<>();
776 removed = new HashSet<>();
777 togo = new HashSet<>(tasks);
778 runlist = togo.iterator();
782 * Get the next task to execute.
784 synchronized Runnable next() {
785 while (runlist != null) {
786 if (runlist.hasNext()) {
787 Runnable task = runlist.next();
788 if (addTaskToSoFar(task)) {
792 if (!added.isEmpty()) {
794 added = new HashSet<>();
796 runlist = togo.iterator();
809 * Add a task to the list of tasks to run whenever the event occurs.
811 synchronized void addTask(Runnable task) {
812 if (runlist != null) {
814 removed.remove(task);
820 * Remove a task from the list of tasks to run whenever the event occurs.
822 synchronized void removeTask(Runnable task) {
823 if (runlist != null) {
830 private boolean addTaskToSoFar(Runnable task) {
831 if (removed.contains(task)) {
834 if (sofar.contains(task)) {