7ecbaafdc302ea056f8187f4a4544d4acc8b89ec
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / NodeConfigManager.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import java.net.*;
28 import java.util.*;
29 import java.io.*;
30
31 import org.apache.log4j.Logger;
32 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
33
34 import com.att.eelf.configuration.EELFLogger;
35 import com.att.eelf.configuration.EELFManager;
36
37
38 /**
39  * Maintain the configuration of a Data Router node
40  * <p>
41  * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention
42  * subsystems to access configuration information.  (Log4J has its own configuration mechanism).
43  * <p>
44  * There are two basic sets of configuration data.  The static local configuration data, stored in a local configuration
45  * file (created as part of installation by SWM), and the dynamic global configuration data fetched from the data router
46  * provisioning server.
47  */
48 public class NodeConfigManager implements DeliveryQueueHelper {
49
50     private static EELFLogger eelflogger = EELFManager.getInstance()
51         .getLogger("org.onap.dmaap.datarouter.node.NodeConfigManager");
52     private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeConfigManager");
53     private static NodeConfigManager base = new NodeConfigManager();
54
55     private Timer timer = new Timer("Node Configuration Timer", true);
56     private long maxfailuretimer;
57     private long initfailuretimer;
58     private long expirationtimer;
59     private double failurebackoff;
60     private long fairtimelimit;
61     private int fairfilelimit;
62     private double fdpstart;
63     private double fdpstop;
64     private int deliverythreads;
65     private String provurl;
66     private String provhost;
67     private IsFrom provcheck;
68     private int gfport;
69     private int svcport;
70     private int port;
71     private String spooldir;
72     private String logdir;
73     private long logretention;
74     private String redirfile;
75     private String kstype;
76     private String ksfile;
77     private String kspass;
78     private String kpass;
79     private String tstype;
80     private String tsfile;
81     private String tspass;
82     private String myname;
83     private RedirManager rdmgr;
84     private RateLimitedOperation pfetcher;
85     private NodeConfig config;
86     private File quiesce;
87     private PublishId pid;
88     private String nak;
89     private TaskList configtasks = new TaskList();
90     private String eventlogurl;
91     private String eventlogprefix;
92     private String eventlogsuffix;
93     private String eventloginterval;
94     private boolean followredirects;
95
96
97     /**
98      * Get the default node configuration manager
99      */
100     public static NodeConfigManager getInstance() {
101         return (base);
102     }
103
104     /**
105      * Initialize the configuration of a Data Router node
106      */
107     private NodeConfigManager() {
108         Properties p = new Properties();
109         try {
110             p.load(new FileInputStream(System
111                 .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties")));
112         } catch (Exception e) {
113
114             NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
115             eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR);
116             logger.error("NODE0301 Unable to load local configuration file " + System
117                 .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"), e);
118         }
119         provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov");
120         try {
121             provhost = (new URL(provurl)).getHost();
122         } catch (Exception e) {
123             NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
124             eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl);
125             logger.error("NODE0302 Bad provisioning server URL " + provurl);
126             System.exit(1);
127         }
128         logger.info("NODE0303 Provisioning server is " + provhost);
129         eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs");
130         provcheck = new IsFrom(provhost);
131         gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080"));
132         svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443"));
133         port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443"));
134         long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000"));
135         long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000"));
136         spooldir = p.getProperty("SpoolDir", "spool");
137         File fdir = new File(spooldir + "/f");
138         fdir.mkdirs();
139         for (File junk : fdir.listFiles()) {
140             if (junk.isFile()) {
141                 junk.delete();
142             }
143         }
144         logdir = p.getProperty("LogDir", "logs");
145         (new File(logdir)).mkdirs();
146         logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L;
147         eventlogprefix = logdir + "/events";
148         eventlogsuffix = ".log";
149         String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat");
150         kstype = p.getProperty("KeyStoreType", "jks");
151         ksfile = p.getProperty("KeyStoreFile", "etc/keystore");
152         kspass = p.getProperty("KeyStorePassword", "changeme");
153         kpass = p.getProperty("KeyPassword", "changeme");
154         tstype = p.getProperty("TrustStoreType", "jks");
155         tsfile = p.getProperty("TrustStoreFile");
156         tspass = p.getProperty("TrustStorePassword", "changeme");
157         if (tsfile != null && tsfile.length() > 0) {
158             System.setProperty("javax.net.ssl.trustStoreType", tstype);
159             System.setProperty("javax.net.ssl.trustStore", tsfile);
160             System.setProperty("javax.net.ssl.trustStorePassword", tspass);
161         }
162         nak = p.getProperty("NodeAuthKey", "Node123!");
163         quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN"));
164         myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);
165         if (myname == null) {
166             NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
167             eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);
168             logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);
169             System.exit(1);
170         }
171         logger.info("NODE0304 My certificate says my name is " + myname);
172         pid = new PublishId(myname);
173         rdmgr = new RedirManager(redirfile, minrsinterval, timer);
174         pfetcher = new RateLimitedOperation(minpfinterval, timer) {
175             public void run() {
176                 fetchconfig();
177             }
178         };
179         logger.info("NODE0305 Attempting to fetch configuration at " + provurl);
180         pfetcher.request();
181     }
182
183     private void localconfig() {
184         followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
185         eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m");
186         initfailuretimer = 10000;
187         maxfailuretimer = 3600000;
188         expirationtimer = 86400000;
189         failurebackoff = 2.0;
190         deliverythreads = 40;
191         fairfilelimit = 100;
192         fairtimelimit = 60000;
193         fdpstart = 0.05;
194         fdpstop = 0.2;
195         try {
196             initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
197         } catch (Exception e) {
198         }
199         try {
200             maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
201         } catch (Exception e) {
202         }
203         try {
204             expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000);
205         } catch (Exception e) {
206         }
207         try {
208             failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO"));
209         } catch (Exception e) {
210         }
211         try {
212             deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS"));
213         } catch (Exception e) {
214         }
215         try {
216             fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT"));
217         } catch (Exception e) {
218         }
219         try {
220             fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000);
221         } catch (Exception e) {
222         }
223         try {
224             fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0;
225         } catch (Exception e) {
226         }
227         try {
228             fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0;
229         } catch (Exception e) {
230         }
231         if (fdpstart < 0.01) {
232             fdpstart = 0.01;
233         }
234         if (fdpstart > 0.5) {
235             fdpstart = 0.5;
236         }
237         if (fdpstop < fdpstart) {
238             fdpstop = fdpstart;
239         }
240         if (fdpstop > 0.5) {
241             fdpstop = 0.5;
242         }
243     }
244
245     private void fetchconfig() {
246         try {
247             System.out.println("provurl:: " + provurl);
248             Reader r = new InputStreamReader((new URL(provurl)).openStream());
249             config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);
250             localconfig();
251             configtasks.startRun();
252             Runnable rr;
253             while ((rr = configtasks.next()) != null) {
254                 try {
255                     rr.run();
256                 } catch (Exception e) {
257                 }
258             }
259         } catch (Exception e) {
260             NodeUtils.setIpAndFqdnForEelf("fetchconfigs");
261             eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
262             logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e);
263             pfetcher.request();
264         }
265     }
266
267     /**
268      * Process a gofetch request from a particular IP address.  If the IP address is not an IP address we would go to to
269      * fetch the provisioning data, ignore the request.  If the data has been fetched very recently (default 10
270      * seconds), wait a while before fetching again.
271      */
272     public synchronized void gofetch(String remoteaddr) {
273         if (provcheck.isFrom(remoteaddr)) {
274             logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr);
275             pfetcher.request();
276         } else {
277             logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr);
278         }
279     }
280
281     /**
282      * Am I configured?
283      */
284     public boolean isConfigured() {
285         return (config != null);
286     }
287
288     /**
289      * Am I shut down?
290      */
291     public boolean isShutdown() {
292         return (quiesce.exists());
293     }
294
295     /**
296      * Given a routing string, get the targets.
297      *
298      * @param routing Target string
299      * @return array of targets
300      */
301     public Target[] parseRouting(String routing) {
302         return (config.parseRouting(routing));
303     }
304
305     /**
306      * Given a set of credentials and an IP address, is this request from another node?
307      *
308      * @param credentials Credentials offered by the supposed node
309      * @param ip IP address the request came from
310      * @return If the credentials and IP address are recognized, true, otherwise false.
311      */
312     public boolean isAnotherNode(String credentials, String ip) {
313         return (config.isAnotherNode(credentials, ip));
314     }
315
316     /**
317      * Check whether publication is allowed.
318      *
319      * @param feedid The ID of the feed being requested
320      * @param credentials The offered credentials
321      * @param ip The requesting IP address
322      * @return True if the IP and credentials are valid for the specified feed.
323      */
324     public String isPublishPermitted(String feedid, String credentials, String ip) {
325         return (config.isPublishPermitted(feedid, credentials, ip));
326     }
327
328     /**
329      * Check who the user is given the feed ID and the offered credentials.
330      *
331      * @param feedid The ID of the feed specified
332      * @param credentials The offered credentials
333      * @return Null if the credentials are invalid or the user if they are valid.
334      */
335     public String getAuthUser(String feedid, String credentials) {
336         return (config.getAuthUser(feedid, credentials));
337     }
338
339     /**
340      * Check if the publish request should be sent to another node based on the feedid, user, and source IP address.
341      *
342      * @param feedid The ID of the feed specified
343      * @param user The publishing user
344      * @param ip The IP address of the publish endpoint
345      * @return Null if the request should be accepted or the correct hostname if it should be sent to another node.
346      */
347     public String getIngressNode(String feedid, String user, String ip) {
348         return (config.getIngressNode(feedid, user, ip));
349     }
350
351     /**
352      * Get a provisioned configuration parameter (from the provisioning server configuration)
353      *
354      * @param name The name of the parameter
355      * @return The value of the parameter or null if it is not defined.
356      */
357     public String getProvParam(String name) {
358         return (config.getProvParam(name));
359     }
360
361     /**
362      * Get a provisioned configuration parameter (from the provisioning server configuration)
363      *
364      * @param name The name of the parameter
365      * @param deflt The value to use if the parameter is not defined
366      * @return The value of the parameter or deflt if it is not defined.
367      */
368     public String getProvParam(String name, String deflt) {
369         name = config.getProvParam(name);
370         if (name == null) {
371             name = deflt;
372         }
373         return (name);
374     }
375
376     /**
377      * Generate a publish ID
378      */
379     public String getPublishId() {
380         return (pid.next());
381     }
382
383     /**
384      * Get all the outbound spooling destinations. This will include both subscriptions and nodes.
385      */
386     public DestInfo[] getAllDests() {
387         return (config.getAllDests());
388     }
389
390     /**
391      * Register a task to run whenever the configuration changes
392      */
393     public void registerConfigTask(Runnable task) {
394         configtasks.addTask(task);
395     }
396
397     /**
398      * Deregister a task to run whenever the configuration changes
399      */
400     public void deregisterConfigTask(Runnable task) {
401         configtasks.removeTask(task);
402     }
403
404     /**
405      * Get the URL to deliver a message to.
406      *
407      * @param destinfo The destination information
408      * @param fileid The file ID
409      * @return The URL to deliver to
410      */
411     public String getDestURL(DestInfo destinfo, String fileid) {
412         String subid = destinfo.getSubId();
413         String purl = destinfo.getURL();
414         if (followredirects && subid != null) {
415             purl = rdmgr.lookup(subid, purl);
416         }
417         return (purl + "/" + fileid);
418     }
419
420     /**
421      * Is a destination redirected?
422      */
423     public boolean isDestRedirected(DestInfo destinfo) {
424         return (followredirects && rdmgr.isRedirected(destinfo.getSubId()));
425     }
426
427     /**
428      * Set up redirection on receipt of a 3XX from a target URL
429      */
430     public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {
431         fileid = "/" + fileid;
432         String subid = destinfo.getSubId();
433         String purl = destinfo.getURL();
434         if (followredirects && subid != null && redirto.endsWith(fileid)) {
435             redirto = redirto.substring(0, redirto.length() - fileid.length());
436             if (!redirto.equals(purl)) {
437                 rdmgr.redirect(subid, purl, redirto);
438                 return (true);
439             }
440         }
441         return (false);
442     }
443
444     /**
445      * Handle unreachable target URL
446      */
447     public void handleUnreachable(DestInfo destinfo) {
448         String subid = destinfo.getSubId();
449         if (followredirects && subid != null) {
450             rdmgr.forget(subid);
451         }
452     }
453
454     /**
455      * Get the timeout before retrying after an initial delivery failure
456      */
457     public long getInitFailureTimer() {
458         return (initfailuretimer);
459     }
460
461     /**
462      * Get the maximum timeout between delivery attempts
463      */
464     public long getMaxFailureTimer() {
465         return (maxfailuretimer);
466     }
467
468     /**
469      * Get the ratio between consecutive delivery attempts
470      */
471     public double getFailureBackoff() {
472         return (failurebackoff);
473     }
474
475     /**
476      * Get the expiration timer for deliveries
477      */
478     public long getExpirationTimer() {
479         return (expirationtimer);
480     }
481
482     /**
483      * Get the maximum number of file delivery attempts before checking if another queue has work to be performed.
484      */
485     public int getFairFileLimit() {
486         return (fairfilelimit);
487     }
488
489     /**
490      * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
491      */
492     public long getFairTimeLimit() {
493         return (fairtimelimit);
494     }
495
496     /**
497      * Get the targets for a feed
498      *
499      * @param feedid The feed ID
500      * @return The targets this feed should be delivered to
501      */
502     public Target[] getTargets(String feedid) {
503         return (config.getTargets(feedid));
504     }
505
506     /**
507      * Get the spool directory for temporary files
508      */
509     public String getSpoolDir() {
510         return (spooldir + "/f");
511     }
512
513     /**
514      * Get the base directory for spool directories
515      */
516     public String getSpoolBase() {
517         return (spooldir);
518     }
519
520     /**
521      * Get the key store type
522      */
523     public String getKSType() {
524         return (kstype);
525     }
526
527     /**
528      * Get the key store file
529      */
530     public String getKSFile() {
531         return (ksfile);
532     }
533
534     /**
535      * Get the key store password
536      */
537     public String getKSPass() {
538         return (kspass);
539     }
540
541     /**
542      * Get the key password
543      */
544     public String getKPass() {
545         return (kpass);
546     }
547
548     /**
549      * Get the http port
550      */
551     public int getHttpPort() {
552         return (gfport);
553     }
554
555     /**
556      * Get the https port
557      */
558     public int getHttpsPort() {
559         return (svcport);
560     }
561
562     /**
563      * Get the externally visible https port
564      */
565     public int getExtHttpsPort() {
566         return (port);
567     }
568
569     /**
570      * Get the external name of this machine
571      */
572     public String getMyName() {
573         return (myname);
574     }
575
576     /**
577      * Get the number of threads to use for delivery
578      */
579     public int getDeliveryThreads() {
580         return (deliverythreads);
581     }
582
583     /**
584      * Get the URL for uploading the event log data
585      */
586     public String getEventLogUrl() {
587         return (eventlogurl);
588     }
589
590     /**
591      * Get the prefix for the names of event log files
592      */
593     public String getEventLogPrefix() {
594         return (eventlogprefix);
595     }
596
597     /**
598      * Get the suffix for the names of the event log files
599      */
600     public String getEventLogSuffix() {
601         return (eventlogsuffix);
602     }
603
604     /**
605      * Get the interval between event log file rollovers
606      */
607     public String getEventLogInterval() {
608         return (eventloginterval);
609     }
610
611     /**
612      * Should I follow redirects from subscribers?
613      */
614     public boolean isFollowRedirects() {
615         return (followredirects);
616     }
617
618     /**
619      * Get the directory where the event and node log files live
620      */
621     public String getLogDir() {
622         return (logdir);
623     }
624
625     /**
626      * How long do I keep log files (in milliseconds)
627      */
628     public long getLogRetention() {
629         return (logretention);
630     }
631
632     /**
633      * Get the timer
634      */
635     public Timer getTimer() {
636         return (timer);
637     }
638
639     /**
640      * Get the feed ID for a subscription
641      *
642      * @param subid The subscription ID
643      * @return The feed ID
644      */
645     public String getFeedId(String subid) {
646         return (config.getFeedId(subid));
647     }
648
649     /**
650      * Get the authorization string this node uses
651      *
652      * @return The Authorization string for this node
653      */
654     public String getMyAuth() {
655         return (config.getMyAuth());
656     }
657
658     /**
659      * Get the fraction of free spool disk space where we start throwing away undelivered files.  This is
660      * FREE_DISK_RED_PERCENT / 100.0.  Default is 0.05.  Limited by 0.01 <= FreeDiskStart <= 0.5.
661      */
662     public double getFreeDiskStart() {
663         return (fdpstart);
664     }
665
666     /**
667      * Get the fraction of free spool disk space where we stop throwing away undelivered files.  This is
668      * FREE_DISK_YELLOW_PERCENT / 100.0.  Default is 0.2.  Limited by FreeDiskStart <= FreeDiskStop <= 0.5.
669      */
670     public double getFreeDiskStop() {
671         return (fdpstop);
672     }
673
674     /**
675      * Get the spool directory for a subscription
676      */
677     public String getSpoolDir(String subid, String remoteaddr) {
678         if (provcheck.isFrom(remoteaddr)) {
679             String sdir = config.getSpoolDir(subid);
680             if (sdir != null) {
681                 logger.info("NODE0310 Received subscription reset request for subscription " + subid
682                     + " from provisioning server " + remoteaddr);
683             } else {
684                 logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid
685                     + " from provisioning server " + remoteaddr);
686             }
687             return (sdir);
688         } else {
689             logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);
690             return (null);
691         }
692     }
693 }