d98c47aebed94ee0fe97e7dca6eeb3d06f1a9353
[dmaap/datarouter.git] / datarouter-node / src / main / java / org / onap / dmaap / datarouter / node / NodeConfigManager.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.node;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.io.FileInputStream;
31 import java.io.InputStreamReader;
32 import java.io.Reader;
33 import java.net.URL;
34 import java.util.Properties;
35 import java.util.Timer;
36 import org.apache.log4j.Logger;
37 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
38
39
40 /**
41  * Maintain the configuration of a Data Router node
42  * <p>
43  * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention
44  * subsystems to access configuration information.  (Log4J has its own configuration mechanism).
45  * <p>
46  * There are two basic sets of configuration data.  The static local configuration data, stored in a local configuration
47  * file (created as part of installation by SWM), and the dynamic global configuration data fetched from the data router
48  * provisioning server.
49  */
50 public class NodeConfigManager implements DeliveryQueueHelper {
51
52     private static EELFLogger eelflogger = EELFManager.getInstance()
53             .getLogger(NodeConfigManager.class);
54     private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeConfigManager");
55     private static NodeConfigManager base = new NodeConfigManager();
56
57     private Timer timer = new Timer("Node Configuration Timer", true);
58     private long maxfailuretimer;
59     private long initfailuretimer;
60     private long waitForFileProcessFailureTimer;
61     private long expirationtimer;
62     private double failurebackoff;
63     private long fairtimelimit;
64     private int fairfilelimit;
65     private double fdpstart;
66     private double fdpstop;
67     private int deliverythreads;
68     private String provurl;
69     private String provhost;
70     private IsFrom provcheck;
71     private int gfport;
72     private int svcport;
73     private int port;
74     private String spooldir;
75     private String logdir;
76     private long logretention;
77     private String redirfile;
78     private String kstype;
79     private String ksfile;
80     private String kspass;
81     private String kpass;
82     private String tstype;
83     private String tsfile;
84     private String tspass;
85     private String myname;
86     private RedirManager rdmgr;
87     private RateLimitedOperation pfetcher;
88     private NodeConfig config;
89     private File quiesce;
90     private PublishId pid;
91     private String nak;
92     private TaskList configtasks = new TaskList();
93     private String eventlogurl;
94     private String eventlogprefix;
95     private String eventlogsuffix;
96     private String eventloginterval;
97     private boolean followredirects;
98
99
100     /**
101      * Get the default node configuration manager
102      */
103     public static NodeConfigManager getInstance() {
104         return (base);
105     }
106
107     /**
108      * Initialize the configuration of a Data Router node
109      */
110     private NodeConfigManager() {
111         Properties p = new Properties();
112         try {
113             p.load(new FileInputStream(System
114                     .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties")));
115         } catch (Exception e) {
116
117             NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
118             eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR);
119             logger.error("NODE0301 Unable to load local configuration file " + System
120                             .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"),
121                     e);
122         }
123         provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov");
124         try {
125             provhost = (new URL(provurl)).getHost();
126         } catch (Exception e) {
127             NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
128             eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl);
129             logger.error("NODE0302 Bad provisioning server URL " + provurl);
130             System.exit(1);
131         }
132         logger.info("NODE0303 Provisioning server is " + provhost);
133         eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs");
134         provcheck = new IsFrom(provhost);
135         gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080"));
136         svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443"));
137         port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443"));
138         long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000"));
139         long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000"));
140         spooldir = p.getProperty("SpoolDir", "spool");
141         File fdir = new File(spooldir + "/f");
142         fdir.mkdirs();
143         for (File junk : fdir.listFiles()) {
144             if (junk.isFile()) {
145                 junk.delete();
146             }
147         }
148         logdir = p.getProperty("LogDir", "logs");
149         (new File(logdir)).mkdirs();
150         logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L;
151         eventlogprefix = logdir + "/events";
152         eventlogsuffix = ".log";
153         String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat");
154         kstype = p.getProperty("KeyStoreType", "jks");
155         ksfile = p.getProperty("KeyStoreFile", "etc/keystore");
156         kspass = p.getProperty("KeyStorePassword", "changeme");
157         kpass = p.getProperty("KeyPassword", "changeme");
158         tstype = p.getProperty("TrustStoreType", "jks");
159         tsfile = p.getProperty("TrustStoreFile");
160         tspass = p.getProperty("TrustStorePassword", "changeme");
161         if (tsfile != null && tsfile.length() > 0) {
162             System.setProperty("javax.net.ssl.trustStoreType", tstype);
163             System.setProperty("javax.net.ssl.trustStore", tsfile);
164             System.setProperty("javax.net.ssl.trustStorePassword", tspass);
165         }
166         nak = p.getProperty("NodeAuthKey", "Node123!");
167         quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN"));
168         myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);
169         if (myname == null) {
170             NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
171             eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);
172             logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);
173             System.exit(1);
174         }
175         logger.info("NODE0304 My certificate says my name is " + myname);
176         pid = new PublishId(myname);
177         rdmgr = new RedirManager(redirfile, minrsinterval, timer);
178         pfetcher = new RateLimitedOperation(minpfinterval, timer) {
179             public void run() {
180                 fetchconfig();
181             }
182         };
183         logger.info("NODE0305 Attempting to fetch configuration at " + provurl);
184         pfetcher.request();
185     }
186
187     private void localconfig() {
188         followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
189         eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
190         initfailuretimer = 10000;
191         waitForFileProcessFailureTimer = 600000;
192         maxfailuretimer = 3600000;
193         expirationtimer = 86400000;
194         failurebackoff = 2.0;
195         deliverythreads = 40;
196         fairfilelimit = 100;
197         fairtimelimit = 60000;
198         fdpstart = 0.05;
199         fdpstop = 0.2;
200         try {
201             initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
202         } catch (Exception e) {
203         }
204         try {
205             waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
206         } catch (Exception e) {
207         }
208         try {
209             maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
210         } catch (Exception e) {
211         }
212         try {
213             expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000);
214         } catch (Exception e) {
215         }
216         try {
217             failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO"));
218         } catch (Exception e) {
219         }
220         try {
221             deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS"));
222         } catch (Exception e) {
223         }
224         try {
225             fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT"));
226         } catch (Exception e) {
227         }
228         try {
229             fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000);
230         } catch (Exception e) {
231         }
232         try {
233             fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0;
234         } catch (Exception e) {
235         }
236         try {
237             fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0;
238         } catch (Exception e) {
239         }
240         if (fdpstart < 0.01) {
241             fdpstart = 0.01;
242         }
243         if (fdpstart > 0.5) {
244             fdpstart = 0.5;
245         }
246         if (fdpstop < fdpstart) {
247             fdpstop = fdpstart;
248         }
249         if (fdpstop > 0.5) {
250             fdpstop = 0.5;
251         }
252     }
253
254     private void fetchconfig() {
255         try {
256             System.out.println("provurl:: " + provurl);
257             Reader r = new InputStreamReader((new URL(provurl)).openStream());
258             config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);
259             localconfig();
260             configtasks.startRun();
261             Runnable rr;
262             while ((rr = configtasks.next()) != null) {
263                 try {
264                     rr.run();
265                 } catch (Exception e) {
266                 }
267             }
268         } catch (Exception e) {
269             NodeUtils.setIpAndFqdnForEelf("fetchconfigs");
270             eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
271             logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e);
272             pfetcher.request();
273         }
274     }
275
276     /**
277      * Process a gofetch request from a particular IP address.  If the IP address is not an IP address we would go to to
278      * fetch the provisioning data, ignore the request.  If the data has been fetched very recently (default 10
279      * seconds), wait a while before fetching again.
280      */
281     public synchronized void gofetch(String remoteaddr) {
282         if (provcheck.isFrom(remoteaddr)) {
283             logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr);
284             pfetcher.request();
285         } else {
286             logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr);
287         }
288     }
289
290     /**
291      * Am I configured?
292      */
293     public boolean isConfigured() {
294         return (config != null);
295     }
296
297     /**
298      * Am I shut down?
299      */
300     public boolean isShutdown() {
301         return (quiesce.exists());
302     }
303
304     /**
305      * Given a routing string, get the targets.
306      *
307      * @param routing Target string
308      * @return array of targets
309      */
310     public Target[] parseRouting(String routing) {
311         return (config.parseRouting(routing));
312     }
313
314     /**
315      * Given a set of credentials and an IP address, is this request from another node?
316      *
317      * @param credentials Credentials offered by the supposed node
318      * @param ip IP address the request came from
319      * @return If the credentials and IP address are recognized, true, otherwise false.
320      */
321     public boolean isAnotherNode(String credentials, String ip) {
322         return (config.isAnotherNode(credentials, ip));
323     }
324
325     /**
326      * Check whether publication is allowed.
327      *
328      * @param feedid The ID of the feed being requested
329      * @param credentials The offered credentials
330      * @param ip The requesting IP address
331      * @return True if the IP and credentials are valid for the specified feed.
332      */
333     public String isPublishPermitted(String feedid, String credentials, String ip) {
334         return (config.isPublishPermitted(feedid, credentials, ip));
335     }
336
337     /**
338      * Check whether delete file is allowed.
339      *
340      * @param subId The ID of the subscription being requested
341      * @return True if the delete file is permitted for the subscriber.
342      */
343     public boolean isDeletePermitted(String subId) {
344         return (config.isDeletePermitted(subId));
345     }
346
347     /**
348      * Check who the user is given the feed ID and the offered credentials.
349      *
350      * @param feedid The ID of the feed specified
351      * @param credentials The offered credentials
352      * @return Null if the credentials are invalid or the user if they are valid.
353      */
354     public String getAuthUser(String feedid, String credentials) {
355         return (config.getAuthUser(feedid, credentials));
356     }
357
358     /**
359      * Check if the publish request should be sent to another node based on the feedid, user, and source IP address.
360      *
361      * @param feedid The ID of the feed specified
362      * @param user The publishing user
363      * @param ip The IP address of the publish endpoint
364      * @return Null if the request should be accepted or the correct hostname if it should be sent to another node.
365      */
366     public String getIngressNode(String feedid, String user, String ip) {
367         return (config.getIngressNode(feedid, user, ip));
368     }
369
370     /**
371      * Get a provisioned configuration parameter (from the provisioning server configuration)
372      *
373      * @param name The name of the parameter
374      * @return The value of the parameter or null if it is not defined.
375      */
376     public String getProvParam(String name) {
377         return (config.getProvParam(name));
378     }
379
380     /**
381      * Get a provisioned configuration parameter (from the provisioning server configuration)
382      *
383      * @param name The name of the parameter
384      * @param defaultValue The value to use if the parameter is not defined
385      * @return The value of the parameter or deflt if it is not defined.
386      */
387     public String getProvParam(String name, String defaultValue) {
388         name = config.getProvParam(name);
389         if (name == null) {
390             name = defaultValue;
391         }
392         return (name);
393     }
394
395     /**
396      * Generate a publish ID
397      */
398     public String getPublishId() {
399         return (pid.next());
400     }
401
402     /**
403      * Get all the outbound spooling destinations. This will include both subscriptions and nodes.
404      */
405     public DestInfo[] getAllDests() {
406         return (config.getAllDests());
407     }
408
409     /**
410      * Register a task to run whenever the configuration changes
411      */
412     public void registerConfigTask(Runnable task) {
413         configtasks.addTask(task);
414     }
415
416     /**
417      * Deregister a task to run whenever the configuration changes
418      */
419     public void deregisterConfigTask(Runnable task) {
420         configtasks.removeTask(task);
421     }
422
423     /**
424      * Get the URL to deliver a message to.
425      *
426      * @param destinationInfo The destination information
427      * @param fileid The file ID
428      * @return The URL to deliver to
429      */
430     public String getDestURL(DestInfo destinationInfo, String fileid) {
431         String subid = destinationInfo.getSubId();
432         String purl = destinationInfo.getURL();
433         if (followredirects && subid != null) {
434             purl = rdmgr.lookup(subid, purl);
435         }
436         return (purl + "/" + fileid);
437     }
438
439     /**
440      * Is a destination redirected?
441      */
442     public boolean isDestRedirected(DestInfo destinfo) {
443         return (followredirects && rdmgr.isRedirected(destinfo.getSubId()));
444     }
445
446     /**
447      * Set up redirection on receipt of a 3XX from a target URL
448      */
449     public boolean handleRedirection(DestInfo destinationInfo, String redirto, String fileid) {
450         fileid = "/" + fileid;
451         String subid = destinationInfo.getSubId();
452         String purl = destinationInfo.getURL();
453         if (followredirects && subid != null && redirto.endsWith(fileid)) {
454             redirto = redirto.substring(0, redirto.length() - fileid.length());
455             if (!redirto.equals(purl)) {
456                 rdmgr.redirect(subid, purl, redirto);
457                 return (true);
458             }
459         }
460         return (false);
461     }
462
463     /**
464      * Handle unreachable target URL
465      */
466     public void handleUnreachable(DestInfo destinationInfo) {
467         String subid = destinationInfo.getSubId();
468         if (followredirects && subid != null) {
469             rdmgr.forget(subid);
470         }
471     }
472
473     /**
474      * Get the timeout before retrying after an initial delivery failure
475      */
476     public long getInitFailureTimer() {
477         return (initfailuretimer);
478     }
479
480     /**
481      * Get the timeout before retrying after delivery and wait for file processing
482      */
483     public long getWaitForFileProcessFailureTimer() {
484         return (waitForFileProcessFailureTimer);
485     }
486
487     /**
488      * Get the maximum timeout between delivery attempts
489      */
490     public long getMaxFailureTimer() {
491         return (maxfailuretimer);
492     }
493
494     /**
495      * Get the ratio between consecutive delivery attempts
496      */
497     public double getFailureBackoff() {
498         return (failurebackoff);
499     }
500
501     /**
502      * Get the expiration timer for deliveries
503      */
504     public long getExpirationTimer() {
505         return (expirationtimer);
506     }
507
508     /**
509      * Get the maximum number of file delivery attempts before checking if another queue has work to be performed.
510      */
511     public int getFairFileLimit() {
512         return (fairfilelimit);
513     }
514
515     /**
516      * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.
517      */
518     public long getFairTimeLimit() {
519         return (fairtimelimit);
520     }
521
522     /**
523      * Get the targets for a feed
524      *
525      * @param feedid The feed ID
526      * @return The targets this feed should be delivered to
527      */
528     public Target[] getTargets(String feedid) {
529         return (config.getTargets(feedid));
530     }
531
532     /**
533      * Get the spool directory for temporary files
534      */
535     public String getSpoolDir() {
536         return (spooldir + "/f");
537     }
538
539     /**
540      * Get the base directory for spool directories
541      */
542     public String getSpoolBase() {
543         return (spooldir);
544     }
545
546     /**
547      * Get the key store type
548      */
549     public String getKSType() {
550         return (kstype);
551     }
552
553     /**
554      * Get the key store file
555      */
556     public String getKSFile() {
557         return (ksfile);
558     }
559
560     /**
561      * Get the key store password
562      */
563     public String getKSPass() {
564         return (kspass);
565     }
566
567     /**
568      * Get the key password
569      */
570     public String getKPass() {
571         return (kpass);
572     }
573
574     /**
575      * Get the http port
576      */
577     public int getHttpPort() {
578         return (gfport);
579     }
580
581     /**
582      * Get the https port
583      */
584     public int getHttpsPort() {
585         return (svcport);
586     }
587
588     /**
589      * Get the externally visible https port
590      */
591     public int getExtHttpsPort() {
592         return (port);
593     }
594
595     /**
596      * Get the external name of this machine
597      */
598     public String getMyName() {
599         return (myname);
600     }
601
602     /**
603      * Get the number of threads to use for delivery
604      */
605     public int getDeliveryThreads() {
606         return (deliverythreads);
607     }
608
609     /**
610      * Get the URL for uploading the event log data
611      */
612     public String getEventLogUrl() {
613         return (eventlogurl);
614     }
615
616     /**
617      * Get the prefix for the names of event log files
618      */
619     public String getEventLogPrefix() {
620         return (eventlogprefix);
621     }
622
623     /**
624      * Get the suffix for the names of the event log files
625      */
626     public String getEventLogSuffix() {
627         return (eventlogsuffix);
628     }
629
630     /**
631      * Get the interval between event log file rollovers
632      */
633     public String getEventLogInterval() {
634         return (eventloginterval);
635     }
636
637     /**
638      * Should I follow redirects from subscribers?
639      */
640     public boolean isFollowRedirects() {
641         return (followredirects);
642     }
643
644     /**
645      * Get the directory where the event and node log files live
646      */
647     public String getLogDir() {
648         return (logdir);
649     }
650
651     /**
652      * How long do I keep log files (in milliseconds)
653      */
654     public long getLogRetention() {
655         return (logretention);
656     }
657
658     /**
659      * Get the timer
660      */
661     public Timer getTimer() {
662         return (timer);
663     }
664
665     /**
666      * Get the feed ID for a subscription
667      *
668      * @param subid The subscription ID
669      * @return The feed ID
670      */
671     public String getFeedId(String subid) {
672         return (config.getFeedId(subid));
673     }
674
675     /**
676      * Get the authorization string this node uses
677      *
678      * @return The Authorization string for this node
679      */
680     public String getMyAuth() {
681         return (config.getMyAuth());
682     }
683
684     /**
685      * Get the fraction of free spool disk space where we start throwing away undelivered files.  This is
686      * FREE_DISK_RED_PERCENT / 100.0.  Default is 0.05.  Limited by 0.01 <= FreeDiskStart <= 0.5.
687      */
688     public double getFreeDiskStart() {
689         return (fdpstart);
690     }
691
692     /**
693      * Get the fraction of free spool disk space where we stop throwing away undelivered files.  This is
694      * FREE_DISK_YELLOW_PERCENT / 100.0.  Default is 0.2.  Limited by FreeDiskStart <= FreeDiskStop <= 0.5.
695      */
696     public double getFreeDiskStop() {
697         return (fdpstop);
698     }
699
700     /**
701      * Get the spool directory for a subscription
702      */
703     public String getSpoolDir(String subid, String remoteaddr) {
704         if (provcheck.isFrom(remoteaddr)) {
705             String sdir = config.getSpoolDir(subid);
706             if (sdir != null) {
707                 logger.info("NODE0310 Received subscription reset request for subscription " + subid
708                         + " from provisioning server " + remoteaddr);
709             } else {
710                 logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid
711                         + " from provisioning server " + remoteaddr);
712             }
713             return (sdir);
714         } else {
715             logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);
716             return (null);
717         }
718     }
719 }